RxJava

RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.

Posted by LXG on July 9, 2020

RxJava-Github

这可能是最好的RxJava 2.x 教程-简书

RxJava-Carson_Ho简书

RxJava-Carson_Ho-Github

概念

首先知道什么是RxJava,Rx是ReactiveX的缩写,而ReactiveX是Reactive Extensions的缩写。RxJava顾名思义即使Java上的异步和基于事件响应式编程库

RxJava基于观察者模式,主要有四个部分:观察者、被观察者、订阅、事件

RxJava就是在观察者模式的骨架下,通过丰富的操作符和便捷的异步操作来完成对于复杂业务的处理

rxjava

build.gradle


    implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
    implementation 'io.reactivex.rxjava2:rxjava:2.2.0'

FlowableSubscriber、Flowable


        // 观察者
        FlowableSubscriber<String> subscriber = new FlowableSubscriber<String>() {
            @Override
            public void onSubscribe(Subscription s) {

            }

            @Override
            public void onNext(String s) {

            }

            @Override
            public void onError(Throwable t) {

            }

            @Override
            public void onComplete() {

            }
        };


        // 被观察者
        Flowable<String> flowable = Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> emitter) throws Exception {

            }
        }, BackpressureStrategy.BUFFER);// 背压策略

        // 订阅多个观察者
        flowable.subscribe(subscriber);

Observable、Observer


        // 被观察者
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {

            }
        }).subscribe(new Observer<String>() { // 观察者
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String s) {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

对比

Observable不支持背压,而Flowable支持背压

使用Observable - 不超过1000个元素、随着时间流逝基本不会出现OOM - GUI事件或者1000Hz频率以下的元素 - 平台不支持Java Steam(Java8新特性) - Observable开销比Flowable低

使用Flowable - 超过10k+的元素(可以知道上限) - 读取硬盘操作(可以指定读取多少行) - 通过JDBC读取数据库 - 网络(流)IO操作

背压 BackPressure

所谓背压就是生产者(被观察者)的生产速度大于消费者(观察者)消费速度从而导致的问题

Single、SingleObserver

如果你使用一个单一连续事件流,即只有一个onNext事件,接着就触发onComplete或者onError,这样你可以使用Single

Single只包含两个事件,一个是正常处理成功的onSuccess,另一个是处理失败的onError,它只发送一次消息(当然就不存在背压问题),其中Single类似于Observable


        // 被观察者
        Single<String> single = Single.create(new SingleOnSubscribe<String>() {
            @Override
            public void subscribe(SingleEmitter<String> emitter) throws Exception {

            }
        });

        // 订阅观察者
        single.subscribe(new SingleObserver<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onSuccess(String s) {

            }

            @Override
            public void onError(Throwable e) {

            }
        });

线程调度

  1. observeOn 方法用于指定下游 Observer 回调发生的线程
  2. 多次指定发射事件的线程只有第一次指定的有效,也就是说多次调用 subscribeOn() 只有第一次的有效,其余的会被忽略
  3. 但多次指定订阅者接收线程是可以的,也就是说每调用一次 observerOn(),下游的线程就会切换一次

        single.subscribeOn(Schedulers.io())
                .observeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new SingleObserver<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onSuccess(String s) {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }
                });

RxJava 中,已经内置了很多线程选项供我们选择,例如有:

  1. Schedulers.io() 代表io操作的线程, 通常用于网络, 读写文件等io密集型的操作
  2. Schedulers.computation() 代表CPU计算密集型的操作, 例如需要大量计算的操作
  3. Schedulers.newThread() 代表一个常规的新线程
  4. AndroidSchedulers.mainThread() 代表Android的主线程

操作符

工作原理图

rxjava_map

map

采用 map 操作符进行网络数据解析


        Single.create(new SingleOnSubscribe<String>() {
            @Override
            public void subscribe(SingleEmitter<String> emitter) throws Exception {

            }
        })
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.io())
                .map(new Function<String, Response>() {
                    @Override
                    public Response apply(String s) throws Exception {
                        // 模拟网络数据解析
                        return JSON.parseObject(s, Response.class);
                    }
                })
                .observeOn(AndroidSchedulers.mainThread()) // 切换到主线程更新UI
                .subscribe(new SingleObserver<Response>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onSuccess(Response s) {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }
                });

rxjava_map

concatMap

采用 concat 操作符先读取缓存再通过网络请求获取数据

flatMap

flatMap 实现多个网络请求依次依赖

zip

善用 zip 操作符,实现多个接口数据共同更新 UI

interval

采用 interval 操作符实现心跳间隔任务