RxJava2知识(1) - Rxjava介绍及基本使用
简介
Rxjava是一个基于事件流实现异步操作的库,相比较AsyncTask、Handler更为简洁。
RxJava的异步实现是通过一种扩展的观察者模式来实现的。
特点
RxJava基于事件流的链式调用,逻辑简洁,实现优雅,使用简单方便,随着程序逻辑的复杂性提高依旧可以很简洁
RxJava 的观察者模式
RxJava 有四个基本概念:Observable (可观察者,即被观察者)、 Observer (观察者)、 subscribe (订阅)、事件。Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。
与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext() (相当于 onClick() / onEvent())之外,还定义了两个特殊的事件:onCompleted() 和 onError()。
- onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。
- onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
- 在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。
基本实现方式
分步骤实现
- 1.创建被观察者(Observable)-生产事件,就是决定事件触发时候的行为
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
// create() 是 RxJava 最基本的创造事件序列的方法
// 此处传入了一个 OnSubscribe 对象参数
// 当 Observable 被订阅时,OnSubscribe 的 call()方法会自动被调用,即事件序列就会依照设定依次被触发
// 即观察者会依次调用对应事件的复写方法从而响应事件
// 从而实现被观察者调用了观察者的回调方法 & 由被观察者向观察者的事件传递,即观察者模式
//在复写的subscribe()里定义需要发送的事件
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
// 通过 ObservableEmitter类对象产生事件并通知观察者
// ObservableEmitter类介绍
// a. 定义:事件发射器
// b. 作用:定义需要发送的事件 & 向观察者发送事件
emitter.onNext(1);
emitter.onComplete();
}
});
//扩展:RxJava 提供了其他方法用于 创建被观察者对象Observable
// 方法1:just(T...):直接将传入的参数依次发送出来
Observable observable1 = Observable.just("A", "B", "C");
// 将会依次调用:
// onNext("A");
// onNext("B");
// onNext("C");
// onCompleted();
// 方法2:from(T[]) / from(Iterable<? extends T>) : 将传入的数组 / Iterable 拆分成具体对象后,依次发送出来
String[] words = {"A", "B", "C"};
Observable observable2 = Observable.fromArray(words);
// 将会依次调用:
// onNext("A");
// onNext("B");
// onNext("C");
// onCompleted();
- 2.创建观察者(Observer)- 定义相应事件的行为
Observer<Integer> observer = new Observer<Integer>() {
//创建对象时通过复写对应事件方法 从而响应对应事件
//观察者接收事件前先调用复写onSubscribe
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "开始采用subscribe连接");
}
// 当被观察者生产Next事件 & 观察者接收到时,会调用该复写方法 进行响应
@Override
public void onNext(Integer value) {
Log.d(TAG, "对Next事件作出响应" + value);
}
// 当被观察者生产Error事件& 观察者接收到时,会调用该复写方法 进行响应
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
// 当被观察者生产Complete事件& 观察者接收到时,会调用该复写方法 进行响应
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
};
- 3.订阅(Subscribe)连接观察者和被观察者
observable.subscribe(observer);
基于事件流的链式调用
实际开发中这样写更简洁美观
Observable.create(new ObservableOnSubscribe<Integer>() {
// 1. 创建被观察者 & 生产事件
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}).subscribe(new Observer<Integer>() {
// 2. 通过通过订阅(subscribe)连接观察者和被观察者
// 3. 创建观察者 & 定义响应事件的行为
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "开始采用subscribe连接");
}
// 默认最先调用复写的 onSubscribe()
@Override
public void onNext(Integer value) {
Log.d(TAG, "对Next事件"+ value +"作出响应" );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
}
}
注:整体方法调用顺序:观察者.onSubscribe()> 被观察者.subscribe()> 观察者.onNext()>观察者.onComplete()
线程调度
- subScribeOn
用于指定 subscribe() 时所发生的线程 - observeOn
observeOn 方法用于指定下游 Observer 回调发生的线程。
简单地说,subscribeOn() 指定的就是发射事件的线程,observerOn 指定的就是订阅者接收事件的线程。
多次指定发射事件的线程只有第一次指定的有效,也就是说多次调用 subscribeOn() 只有第一次的有效,其余的会被忽略。
但多次指定订阅者接收线程是可以的,也就是说每调用一次 observerOn(),下游的线程就会切换一次。
RxJava 中,已经内置了很多线程选项供我们选择,例如有:
- Schedulers.io() 代表io操作的线程, 通常用于网络,读写文件等io密集型的操作;
- Schedulers.computation() 代表CPU计算密集型的操作, 例如需要大量计算的操作;
- Schedulers.newThread() 代表一个常规的新线程;
- AndroidSchedulers.mainThread() 代表Android的主线程
看看咱的实力,清晰的叙述脉络,悦目的排版形式,太棒了!