RxJava2知识(1) - Rxjava介绍及基本使用

简介

Rxjava是一个基于事件流实现异步操作的库,相比较AsyncTask、Handler更为简洁。
RxJava的异步实现是通过一种扩展的观察者模式来实现的。

特点

RxJava基于事件流的链式调用,逻辑简洁,实现优雅,使用简单方便,随着程序逻辑的复杂性提高依旧可以很简洁

RxJava 的观察者模式

RxJava 有四个基本概念:Observable (可观察者,即被观察者)、 Observer (观察者)、 subscribe (订阅)、事件。Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。

与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext() (相当于 onClick() / onEvent())之外,还定义了两个特殊的事件:onCompleted() 和 onError()。

  • onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。
  • onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
  • 在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。

基本实现方式

分步骤实现

  • 1.创建被观察者(Observable)-生产事件,就是决定事件触发时候的行为
    Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
        // create() 是 RxJava 最基本的创造事件序列的方法
        // 此处传入了一个 OnSubscribe 对象参数
        // 当 Observable 被订阅时,OnSubscribe 的 call()方法会自动被调用,即事件序列就会依照设定依次被触发
        // 即观察者会依次调用对应事件的复写方法从而响应事件
        // 从而实现被观察者调用了观察者的回调方法 & 由被观察者向观察者的事件传递,即观察者模式

        //在复写的subscribe()里定义需要发送的事件
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            // 通过 ObservableEmitter类对象产生事件并通知观察者
            // ObservableEmitter类介绍
            // a. 定义:事件发射器
            // b. 作用:定义需要发送的事件 & 向观察者发送事件
            emitter.onNext(1);
            emitter.onComplete();
        }
    });

    //扩展:RxJava 提供了其他方法用于 创建被观察者对象Observable
    // 方法1:just(T...):直接将传入的参数依次发送出来
    Observable observable1 = Observable.just("A", "B", "C");
    // 将会依次调用:
    // onNext("A");
    // onNext("B");
    // onNext("C");
    // onCompleted();

    // 方法2:from(T[]) / from(Iterable<? extends T>) : 将传入的数组 / Iterable 拆分成具体对象后,依次发送出来
    String[] words = {"A", "B", "C"};
    Observable observable2 = Observable.fromArray(words);
    // 将会依次调用:
    // onNext("A");
    // onNext("B");
    // onNext("C");
    // onCompleted();
  • 2.创建观察者(Observer)- 定义相应事件的行为
    Observer<Integer> observer = new Observer<Integer>() {
        //创建对象时通过复写对应事件方法 从而响应对应事件

        //观察者接收事件前先调用复写onSubscribe
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "开始采用subscribe连接");
        }

        // 当被观察者生产Next事件 & 观察者接收到时,会调用该复写方法 进行响应
        @Override
        public void onNext(Integer value) {
            Log.d(TAG, "对Next事件作出响应" + value);
        }
        // 当被观察者生产Error事件& 观察者接收到时,会调用该复写方法 进行响应
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "对Error事件作出响应");
        }

        // 当被观察者生产Complete事件& 观察者接收到时,会调用该复写方法 进行响应
        @Override
        public void onComplete() {
            Log.d(TAG, "对Complete事件作出响应");
        }
    };
  • 3.订阅(Subscribe)连接观察者和被观察者
observable.subscribe(observer);

基于事件流的链式调用

实际开发中这样写更简洁美观
Observable.create(new ObservableOnSubscribe<Integer>() {
        // 1. 创建被观察者 & 生产事件
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        }).subscribe(new Observer<Integer>() {
            // 2. 通过通过订阅(subscribe)连接观察者和被观察者
            // 3. 创建观察者 & 定义响应事件的行为
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "开始采用subscribe连接");
            }
            // 默认最先调用复写的 onSubscribe()

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "对Next事件"+ value +"作出响应"  );
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "对Error事件作出响应");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "对Complete事件作出响应");
            }

        });
    }
}

注:整体方法调用顺序:观察者.onSubscribe()> 被观察者.subscribe()> 观察者.onNext()>观察者.onComplete()

线程调度

  • subScribeOn
    用于指定 subscribe() 时所发生的线程
  • observeOn
    observeOn 方法用于指定下游 Observer 回调发生的线程。

简单地说,subscribeOn() 指定的就是发射事件的线程,observerOn 指定的就是订阅者接收事件的线程。
多次指定发射事件的线程只有第一次指定的有效,也就是说多次调用 subscribeOn() 只有第一次的有效,其余的会被忽略。
但多次指定订阅者接收线程是可以的,也就是说每调用一次 observerOn(),下游的线程就会切换一次。

RxJava 中,已经内置了很多线程选项供我们选择,例如有:

  • Schedulers.io() 代表io操作的线程, 通常用于网络,读写文件等io密集型的操作;
  • Schedulers.computation() 代表CPU计算密集型的操作, 例如需要大量计算的操作;
  • Schedulers.newThread() 代表一个常规的新线程;
  • AndroidSchedulers.mainThread() 代表Android的主线程
讨论数量: 1

看看咱的实力,清晰的叙述脉络,悦目的排版形式,太棒了!

4年前

请勿发布不友善或者负能量的内容。与人为善,比聪明更重要!