RxJava

RxJava,一个大家最近吵的很火热的词.

RxJava is a Java VM implementation of ReactiveX (Reactive Extensions): a library for composing asynchronous and event-based programs by using observable sequences.

大约就是这个意思:RxJava是一个通过观察队列实现的异步的,基于事件的在JVM的库


关于ReactiveX

Rx = Observables + LINQ + Schedulers。

使用观察者模式

  • 创建:Rx可以方便的创建事件流和数据流
  • 组合:Rx使用查询式的操作符组合和变换数据流
  • 监听:Rx可以订阅任何可观察的数据流并执行操作

简化代码

  • 函数式风格:对可观察数据流使用无副作用的输入输出函数,避免了程序里错综复杂的状态 简化代码:Rx的操作符通通常可以将复杂的难题简化为很少的几行代码
  • 异步错误处理:传统的try/catch没办法处理异步计算,Rx提供了合适的错误处理机制
  • 轻松使用并发:Rx的Observables和Schedulers让开发者可以摆脱底层的线程同步和各种并发问题

观察者模式

传统的观察者模式中,例如:监视.
在一个地方警察蹲点看小偷,这里警察就是观察者,小偷就是被观察者,警察目不转睛,不停的看着小偷的一举一动,直到小偷开始作案.

1
2
3
4
5
6
7
8
9
boolean state=true;
while(state){
//TODO 看小偷是否偷东西
if(小偷开始偷){
sleep(10);
state=false;
//TODO 执行抓捕
}
}

程序中的观察者模式,采用注册/订阅的模式.

扩展的观察者模式

观察者模式就不用多说了,大家都懂的,android中最常见的就是view.setOnClickListener().不过这里的观察者,通过注册(register)或者订阅(Subscribe)的方式实现.

RxJava 的观察者模式有是个基本的状态

  • Observable(可观察者)
  • Observer(观察者)
  • subscribe(订阅)
  • 事件

不同意传统的观察者模式,onNext()[onClick()/onEvent()]这里增加两个:

  • onCompleted()
  • onError()

onCompleted():事件队列完成,RxJava不仅把每个事件都单独处理,还把他们当成一个队列处理,当不再有新的onNext()出现的时候,就得调用onCompleted()

onError():事件队列异常,onError会触发,同时队列自动终止,不允许再有事件发出

onCompleted()和onError()是互斥的.

实现

  1. 创建Observer(观察者)
  2. 创建Observable(被观察者)
  3. 创建Subscribe(订阅)

Observer(观察者)

观察者决定事件触发的时候会有这么样的行为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Observer<String> observer = new Observer<String>() {
@Override
public void onNext(String s) {
Log.d(tag, "Item: " + s);
}

@Override
public void onCompleted() {
Log.d(tag, "Completed!");
}

@Override
public void onError(Throwable e) {
Log.d(tag, "Error!");
}
};

除了 Observer 接口之外,RxJava 还内置了一个实现了 Observer 的抽象类:Subscriber。 Subscriber 对 Observer 接口进行了一些扩展,但他们的基本使用方式是完全一样的.

实质上,在RxJava中,Observer也总是会被转成Subscriber.

他们的区别:

  1. Subscriber有新增加的方法onStart(),它会执行在subscribe刚开始,这个时候可以做准备工作,比如数据初始化.但是注意,subscribe会执行在任何线程,所以要操作UI线程的东西,比如弹出加载的dialog之类的,是不可以的.除非使用doOnSubscribe()方法.
  2. unsubscribe()这个方法,用于取消订阅,Subscriber实现的另一个接口Subscription的方法,这个方法在调用以后,Subscriber就不会再接收事件了.所以在调用这个方法的时候,先调用isUnsubscribed(),这个方法很重要,因为在 subscribe() 之后, Observable 会持有Subscriber的引用,这个引用如果不能及时被释放,将有内存泄露的风险。所以最好保持一个原则:要在不再使用的时候尽快在合适的地方(例如 onPause() onStop() 等方法中)调用 unsubscribe() 来解除引用关系,以避免内存泄露的发生。

创建 Observable

Observable是被观察者,它决定什么时候触发事件,和触发什么事件.RxJava使用create()方法创建一个Observable,然后定义事件的触发规则.

1
2
3
4
5
6
7
8
9
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("Hi");
subscriber.onNext("John");
subscriber.onCompleted();
}
);

可以看到,传入了OnSubscribe对象,它被存储到了返回的Observable对象中,作用就是一个依次执行的计划表.当Observable被订阅的时候,call()方法就会调用.被观察者调用了观察者3次onNext,1次onCompleted().
以上代码还可以这样写:

1
Observable observable = Observable.just("Hello", "Hi", "John");

这样写

1
2
String[] words = {"Hello", "Hi", "John"};
Observable observable = Observable.from(words);

Subscribe订阅

创建了Observer和Observable之后,关联起来,就可以了

1
2
observable.subscribe(observer);
observable.subscribe(subscriber);

Observable.subscribe(Subscriber)实现的伪代码

1
2
3
4
5
public Subscription subscribe(Subscriber subscriber) {
subscriber.onStart();
onSubscribe.call(subscriber);
return subscriber;
}

做了三件事情

  1. 调用 Subscriber.onStart() 。这个方法在前面已经介绍过,是一个可选的准备方法。
  2. 调用 Observable 中的 OnSubscribe.call(Subscriber) 。在这里,事件发送的逻辑开始运行。从这也可以看出,在 RxJava 中, Observable并不是在创建的时候就立即开始发送事件,而是在它被订阅的时候,即当 subscribe() 方法执行的时候。
  3. 将传入的 Subscriber 作为 Subscription 返回。这是为了方便 unsubscribe().

除了 subscribe(Observer) 和 subscribe(Subscriber) ,subscribe() 还支持不完整定义的回调,RxJava 会自动根据定义创建出 Subscriber

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
Action1<String> onNextAction = new Action1<String>() {
// onNext()
@Override
public void call(String s) {
Log.d(tag, s);
}
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
// onError()
@Override
public void call(Throwable throwable) {
// Error handling
}
};
Action0 onCompletedAction = new Action0() {
// onCompleted()
@Override
public void call() {
Log.d(tag, "completed");
}
};

// 自动创建 Subscriber ,并使用 onNextAction 来定义 onNext()
observable.subscribe(onNextAction);
// 自动创建 Subscriber ,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自动创建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);

简单解释一下这段代码中出现的 Action1 和 Action0。 Action0 是 RxJava 的一个接口,它只有一个方法 call(),这个方法是无参无返回值的;由于 onCompleted() 方法也是无参无返回值的,因此 Action0 可以被当成一个包装对象,将 onCompleted() 的内容打包起来将自己作为一个参数传入 subscribe() 以实现不完整定义的回调。这样其实也可以看做将 onCompleted() 方法作为参数传进了 subscribe(),相当于其他某些语言中的『闭包』。 Action1 也是一个接口,它同样只有一个方法 call(T param),这个方法也无返回值,但有一个参数;与 Action0 同理,由于 onNext(T obj) 和 onError(Throwable error) 也是单参数无返回值的,因此 Action1 可以将 onNext(obj) 和 onError(error) 打包起来传入 subscribe() 以实现不完整定义的回调。事实上,虽然 Action0 和 Action1 在 API 中使用最广泛,但 RxJava 是提供了多个 ActionX 形式的接口 (例如 Action2, Action3) 的,它们可以被用以包装不同的无返回值的方法。

注:正如前面所提到的,Observer 和 Subscriber 具有相同的角色,而且 Observer 在 subscribe() 过程中最终会被转换成 Subscriber 对象,因此,从这里开始,后面的描述我将用 Subscriber 来代替 Observer ,这样更加严谨。

调度器(Scheduler)

首先,rxJava中,所有的活动的线程是不变的,调用subscribe()的线程,会执行所有的事情,如果需要切换线程,就需要调用scheduler.

概念

Scheduler是一个线程控制器,来决定在那个线程执行.

  • Schedulers.immediate() 直接在当前线程运行,这个是默认的选项.
  • Schedulers.newThread() 总是启动新的线程,并且在新的线程中执行.
  • Schedulers.io() 专门服务IO的Scheduler,包括文件数据库的读写,网络数据的交互,本质是一个无数量上线的线程池,可以重用空闲的线程,比newThread()更高效.
  • Schedulers.computation() 计算用的Scheduler,使用了固定的线程池,大小为CPU的核数,
  • AndroidSchedulers.mainThread() 大家都懂,就不说了

线程控制

  • subscribeOn() Observable.OnSubscribe被激活时在的线程,事件产生的线程.
  • observeOn() Subscriber所运行在的线程,事件消费线程.
1
2
3
4
5
6
7
8
9
Observable.just(1, 2, 3, 4)
.subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer number) {

Log.d(tag, "number:" + number);
}
});

被创建事件的内容1,2,3,4将会在IO线程发出,但是数字打印将会在主线程中执行.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
int drawableRes = ...;
ImageView imageView = ...;
Observable.create(new OnSubscribe<Drawable>() {
@Override
public void call(Subscriber<? super Drawable> subscriber) {
Drawable drawable = getTheme().getDrawable(drawableRes));
subscriber.onNext(drawable);
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
.subscribe(new Observer<Drawable>() {
@Override
public void onNext(Drawable drawable) {
imageView.setImageDrawable(drawable);
}

@Override
public void onCompleted() {
}

@Override
public void onError(Throwable e) {
Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
}
});

subscribeOn(Scheduler.io()) 和 observeOn(AndroidSchedulers.mainThread())
就是典型的异步获取数据的方法,来保证前台的快速响应.