概念
首先知道什么是RxJava,Rx是ReactiveX的缩写,而ReactiveX是Reactive Extensions的缩写。RxJava顾名思义即使Java上的异步和基于事件响应式编程库
RxJava基于观察者模式,主要有四个部分:观察者、被观察者、订阅、事件
RxJava就是在观察者模式的骨架下,通过丰富的操作符和便捷的异步操作来完成对于复杂业务的处理
build.gradle
implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
implementation 'io.reactivex.rxjava2:rxjava:2.2.0'
FlowableSubscriber、Flowable
// 观察者
FlowableSubscriber<String> subscriber = new FlowableSubscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
};
// 被观察者
Flowable<String> flowable = Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
}
}, BackpressureStrategy.BUFFER);// 背压策略
// 订阅多个观察者
flowable.subscribe(subscriber);
Observable、Observer
// 被观察者
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
}
}).subscribe(new Observer<String>() { // 观察者
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
对比
Observable不支持背压,而Flowable支持背压
使用Observable - 不超过1000个元素、随着时间流逝基本不会出现OOM - GUI事件或者1000Hz频率以下的元素 - 平台不支持Java Steam(Java8新特性) - Observable开销比Flowable低
使用Flowable - 超过10k+的元素(可以知道上限) - 读取硬盘操作(可以指定读取多少行) - 通过JDBC读取数据库 - 网络(流)IO操作
背压 BackPressure
所谓背压就是生产者(被观察者)的生产速度大于消费者(观察者)消费速度从而导致的问题
Single、SingleObserver
如果你使用一个单一连续事件流,即只有一个onNext事件,接着就触发onComplete或者onError,这样你可以使用Single
Single只包含两个事件,一个是正常处理成功的onSuccess,另一个是处理失败的onError,它只发送一次消息(当然就不存在背压问题),其中Single类似于Observable
// 被观察者
Single<String> single = Single.create(new SingleOnSubscribe<String>() {
@Override
public void subscribe(SingleEmitter<String> emitter) throws Exception {
}
});
// 订阅观察者
single.subscribe(new SingleObserver<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(String s) {
}
@Override
public void onError(Throwable e) {
}
});
线程调度
- observeOn 方法用于指定下游 Observer 回调发生的线程
- 多次指定发射事件的线程只有第一次指定的有效,也就是说多次调用 subscribeOn() 只有第一次的有效,其余的会被忽略
- 但多次指定订阅者接收线程是可以的,也就是说每调用一次 observerOn(),下游的线程就会切换一次
single.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new SingleObserver<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(String s) {
}
@Override
public void onError(Throwable e) {
}
});
RxJava 中,已经内置了很多线程选项供我们选择,例如有:
- Schedulers.io() 代表io操作的线程, 通常用于网络, 读写文件等io密集型的操作
- Schedulers.computation() 代表CPU计算密集型的操作, 例如需要大量计算的操作
- Schedulers.newThread() 代表一个常规的新线程
- AndroidSchedulers.mainThread() 代表Android的主线程
操作符
map
采用 map 操作符进行网络数据解析
Single.create(new SingleOnSubscribe<String>() {
@Override
public void subscribe(SingleEmitter<String> emitter) throws Exception {
}
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.map(new Function<String, Response>() {
@Override
public Response apply(String s) throws Exception {
// 模拟网络数据解析
return JSON.parseObject(s, Response.class);
}
})
.observeOn(AndroidSchedulers.mainThread()) // 切换到主线程更新UI
.subscribe(new SingleObserver<Response>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(Response s) {
}
@Override
public void onError(Throwable e) {
}
});
concatMap
采用 concat 操作符先读取缓存再通过网络请求获取数据
flatMap
flatMap 实现多个网络请求依次依赖
zip
善用 zip 操作符,实现多个接口数据共同更新 UI
interval
采用 interval 操作符实现心跳间隔任务