RxJava基本知识
RxJava是一个在JVM上使用可观测的序列来组成异步的、基于事件的程序的库。
通常情况下使用Handler、AsyncTask
完成异步任务操作,当任务比较多的时候,使用这种方式就会变得相当繁琐,尤其是嵌套式的执行任务(需要上一项先完成)。
RxJava可以实现异步任务执行的前提下保持代码的清晰。通过创建一个Observable
来完成异步任务,然后组合各种操作符实现不同的链式操作,最终将任务直执行结果发射到Observer
进行处理。
简单示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe (ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1 ); emitter.onNext(2 ); emitter.onComplete(); } }); observable.subscribe(new Observer<Integer>() { @Override public void onSubscribe (Disposable d) { System.err.println("onSubscribe" ); } @Override public void onNext (Integer integer) { System.out.println("onNext()" +integer); } @Override public void onError (Throwable e) { System.err.println("onError" ); } @Override public void onComplete () { System.err.println("onComplete" ); } }); 输出结果: onSubscribe onNext1 onNext2 onComplete
Observable
发射出一个事件,通过subscribe()
建立与Observer
的关联,然后回调到具体事件内(onNext()、onError()、onComplete()
)。
除了Observable
外,还提供了适用其他场景的基础类:
事件类型 Observable
是通过ObservableEmitter
来发射事件的,可以发出的事件类型有三种:onNext()、onError(),onComplete()
。
onNext()
对应Observer.onNext()
,用于发射数据
onComplete()
对应Observer.onComplete()
,用于发射完成通知
onError()
对应Observer.onError()
,用于发射异常通知
他们三者之间满足一定的规则:
ObservableEmitter
可以发射无数的onNext()
事件,Observer
也可以处理无数onNext()
事件
当ObservableEmitter
发射出onComplete()
事件,Observer
接收到这个事件后,不会再继续接收后续事件 。
当ObservableEmitter
发射出onError()
事件,Observer
接收到这个事件后,不会再继续接收后续事件 。
onComplete()
与onError()
是唯一且互斥 的。不能发射分多个onComplete()
或onError()
,也不能发射onComplete()
后再跟上onError()
。
取消订阅 通过Disposable
实现,在Observer
的onSubsccribe()
中返回对象,他有两个方法isDisposed()
——判断是否停止了观察指定的流,dispose()
——放弃观察指定的流。
执行Disposeable.dispose()
后,并不会影响ObservableEmitter
继续发送事件,但是Observer
中不会继续接收事件,相当于切断了两者的联系。
RxJava操作符 如上节示例所示,通过Observable.create()
就可以创建出一个Observable
对象,除了create
外,还有其他的操作符方便我们完成链式操作,下面详解介绍这些操作符。
创建操作符
用于创建Observable对象,并填充数据
create
用于从头到尾创建一个Observable对象,发射器内部会调用onNext()、onComplete(),onError()
方法
1 2 3 4 5 6 7 8 9 10 public static <T> Observable<T> create (ObservableOnSubscribe<T> source) Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe (ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1 ); emitter.onNext(2 ); emitter.onComplete(); } });
不推荐使用,使用较为繁琐且不方便操作。
from系列
从指定的数据源中获取一个Observable对象
fromArray(T... items)
:从数组中获取
fromCallable(Callable<? extends T> supplier)
:从Callable对象中获取
fromFuture(Future<? extends T> future)
:从Future对象中获取
fromIterable(Iterable<? extends T> source)
:从Iterable中获取
formPublisher(Publisher<? extends T> publisher)
:从Publisher中获取
1 Observable<Integer> observable = Observable.fromArray(1 ,2 ,3 ,4 );
just
支持发送不同格式的参数,个数从1~10个
1 2 3 public static <T> Observable<T> just (T item) Observable<Integer> observable = Observable.just(1 ,2 );
interval&intervalRange
每隔指定时间就发射一个Observable对象
interval
1 2 3 4 5 6 7 8 9 10 11 12 public static Observable<Long> interval (long initialDelay, long period, TimeUnit unit, Scheduler scheduler) 其他重载方法:public static Observable<Long> interval (long initialDelay, long period, TimeUnit unit) public static Observable<Long> interval (long period, TimeUnit unit) Observable observable = Observable.interval(1 , TimeUnit.SECONDS);
intervalRange
:控制发射的序列在一个范围之内,发射最后一位时停止
1 2 3 4 5 6 7 public static Observable<Long> intervalRange (long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler) Observable<Long> observable =Observable.intervalRange(1 ,10 ,1 ,1 , TimeUnit.SECONDS, Schedulers.trampoline());
range&rangeLong
发射指定范围内的连续整数的Observable对象
1 2 3 4 5 6 7 8 9 public static Observable<Integer> range (final int start, final int count) Observable<Integer> observable = Observable.range(1 ,5 );public static Observable<Long> rangeLong (long start, long count) Observable<Long> observable = Observable.range(1 ,5 );
range
与rangeLong
的区别在于:前者返回int,后者返回long
defer
直到有观察者订阅是才创建的Observable
对象,并未每一个订阅者都创建一个新的Observable
对象。
1 2 3 public static <T> Observable<T> defer (Callable<? extends ObservableSource<? extends T>> supplier) Observable<Long> deferObservable = Observable.defer(() -> Observable.just(11L ));
可以使用defer
封装需要被多次执行的函数
empty & never & error empty
:创建一个不发射任何数据但是能正常终止的Observable
对象
1 2 3 public static <T> Observable<T> empty () Observable<Integer> observable = Observable.empty();
error
:创建一个不发射任何数据但是能发出错误的Observable
对象
1 2 3 public static <T> Observable<T> error (Callable<? extends Throwable> errorSupplier) Observable<Integer> observable = Observable.error(new IllegalArgumentException("qwe" ));
never
:创建一个不发射任何数据也不能终止的Observable
对象
1 2 3 public static <T> Observable<T> never () Observable<Integer> observable = Observable.never()
repeat&repeatWhen
并不是创建一个Observable
对象,而是重复发射原始的Observable
数据序列,可能是无限的,如果设置repeat(n)
可以限制重复次数
1 2 3 4 5 6 public final Observable<T> repeat () public final Observable<T> repeat (long times) Observable<Integer> observable = Observable.just(1 ).repeat(3 );
repeatWhen
:重复执行同一操作
1 2 3 public final Observable<T> repeatWhen (final Function<? super Observable<Object>, ? extends ObservableSource<?>> handler) Observable<Long> observable = Observable.just(1L ).repeatWhen(objectObservable -> Observable.just(1 ).repeat(3 ));
timer
创建一个在制定延迟时间后发射一条数据的Observable
对象
1 2 3 4 5 6 public static Observable<Long> timer (long delay, TimeUnit unit) Observable<Long> observable = Observable.timer(3 ,TimeUnit.SECOND);
变换操作符 map
将发射的原始Observable
对象转成一个新的Observable
对象。
1 2 3 4 5 6 7 public final <R> Observable<R> map (Function<? super T, ? extends R> mapper) Observable<String> observable = Observable.just(1L ).map(aLong -> aLong.toString());
在定义的Function()
中进行数据类型转换
flatMap
将一个发送事件的原始Observable
变换为多个发送事件的Observables
,然后把这些合并到一个单独的Observable
里。flatMap
并不保证事件的顺序,实现的是一对多的转化
1 2 3 4 5 public final <R> Observable<R> flatMap (Function<? super T, ? extends ObservableSource<? extends R>> mapper) Observable<String> observable = Observable.just(1L ,2L ,3L ,4L ,5L ) .flatMap((Function<Long, ObservableSource<String>>) aLong -> Observable.just("flatMap" +aLong.toString()));
concatMap
将一个发送事件的原始Observable
变换为多个发送事件的Observables
,然后把这些合并到一个单独的Observable
里。concatMap可以保证事件的顺序,实现的是一对多的转化
1 2 3 4 5 public final <R> Observable<R> concatMap (Function<? super T, ? extends ObservableSource<? extends R>> mapper, int prefetch) Observable<String> observable = Observable.just(1L ,2L ,3L ,4L ,5L ) .concatMap((Function<Long, ObservableSource<String>>) aLong -> Observable.just("flatMap" +aLong.toString()));
在使用concatMap
的时候,内部会新建一个队列对象,会把先发射的数据放入队列中,内部是利用了AtomicInteger
实现自增,然后输出数据的时候就可以按照顺序。
cast
将原始的Observable
对象中的所有数据都强制转换为一个指定的类型(多态,只能由父类对象转为子类对象 ),然后发射数据
1 2 3 4 public final <U> Observable<U> cast (final Class<U> clazz) Observable<String> observable = Observable.just(1L ).cast(String.class ) ;
flatMapIterable
将原始的任意一个元素转换成一个Iterable
对象,直接在内部以Iterable
接口接收数据。
1 2 3 4 5 6 7 8 9 10 11 public final <U> Observable<U> flatMapIterable (final Function<? super T, ? extends Iterable<? extends U>> mapper) Observable<String> observable = Observable.just(1L ,2L ,3L ,4L ,5L ) .flatMapIterable(new Function<Long, Iterable<? extends String>>() { @Override public Iterable<? extends String> apply(Long aLong) throws Exception { ArrayList<String> list = new ArrayList<>(); list.add(aLong.toString()); return list; } });
buffer
对整个流进行分组,将原先的Observable
对象转换为一个新的Observable
对象,每次发生一组值,而不是原先的一个个发送。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public final Observable<List<T>> buffer(int count, int skip)public final Observable<List<T>> buffer(int count) Observable<String> observable = Observable.range(1 , 20 ) .buffer(3 , 1 ) .map(new Function<List<Integer>, String>() { @Override public String apply (List<Integer> lists) throws Exception { return lists.toString(); } });
window
与buffer
功能类似,但它发射出的是Observable
,这些Observable
在单独发射数据,都发送完毕后,发出onComplete()
通知。
1 2 3 4 public final Observable<Observable<T>> window(long count) Observable.range(1 , 10 ).window(3 ).subscribe( observable -> observable.subscribe(integer -> System.out.println(observable.hashCode() + " : " + integer)));
过滤操作符 filter
根据指定规则对发送数据进行过滤
1 2 3 4 5 6 7 8 9 public final Observable<T> filter (Predicate<? super T> predicate) Observable.range (1 ,10 ) .filter (new Predicate<Integer>() { @Override public boolean test (Integer integer) throws Exception { return integer>5 ; } })
elementAt&firstElement&lastElement
elementAt
:用于获取数据源中的指定位置的数据
firstElement
:用于获取数据源中的第一个元素
lastElement
:用于获取数据源中的最后一个元素
1 2 3 public final Maybe<T> elementAt (long index) Observable.range (0 , 5 ) .elementAt (3 ) ;
distinct&distinctUntilChanged
distinct
:对数据源中的重复数据进行过滤
distinctUntilChanged
:只会过滤相邻的两个相同元素
1 2 3 4 public final Observable<T> distinct () Observable.just (1 ,2 ,3 ,4 ,5 ,6 ,5 ) .distinct () => 1 ,2 ,3 ,4 ,5 ,6 Observable.just(1 ,2 ,3 ,4 ,5 ,6 ,5 ).distinctUntilChanged() => 1 ,2 ,3 ,4 ,5 ,6 ,5
skip&skipLast
skip
:对数据源中的前几项进行过滤
skipLast
:对数据源中的后几项进行过滤,以及最后的一段时间不发射数据。
1 2 3 4 5 6 7 8 9 public final Observable<T> skip (long count) Observable.just (1 ,2 ,3 ,4 ,5 ,6 ,5 ) .skip (3 ) => 4 ,5 ,6 ,5 public final Observable<T> skip (long time, TimeUnit unit) Observable.just (1 ,2 ,3 ,4 ,5 ,6 ,5 ) .skip (1 ,TimeUnit.SECONDS) => 1 ,2 ,3 ,4 ,5 ,6 ,5 public final Observable<T> skipLast (long count) Observable.just (1 ,2 ,3 ,4 ,5 ,6 ,5 ) .skipLast (3 ) => 1 ,2 ,3 ,4
take&takeLast
take
:取出数据源的前几项数据
takeLast
:取出数据源的后几项数据
1 2 3 4 5 6 7 8 public final Observable<T> take (long count) Observable.just (1 ,2 ,3 ,4 ,5 ,6 ,5 ) .take (3 ) =>1 ,2 ,3 public final Observable<T> take (long time, TimeUnit unit) Observable.just (1 ,2 ,3 ,4 ,5 ,6 ,5 ) .repeat (1 ) .take (100 ,TimeUnit.MILLISECONDS) => 1 ,2 ,3 ,4 ,5 ,6 ,5 public final Observable<T> takeLast (long count) Observable.just (1 ,2 ,3 ,4 ,5 ,6 ,5 ) .takeLast (3 )
ignoreElements
过滤所有Observable对象发射的数据,只允许onError()
或onComplete()
发送
1 2 3 4 5 6 7 8 9 10 public final Completable ignoreElements () Observable.just (1 ,2 ,3 ,4 ,5 ,6 ,5 ) .ignoreElements () .subscribe (new CompletableObserver() { @Override public void onSubscribe (Disposable disposable) { } @Override public void onComplete () { } @Override public void onError (Throwable throwable) { } });
throttleFirst & throttleLast&throttleLatest
throttleFirst
:对输出的数据进行限制,按照指定的参数对时间进行分片,然后选择每个时间片中的第一条数据
throttleLast
:对输出的数据进行限制,按照指定的参数对时间进行分片,然后选择每个时间片中的最后一条数据
throttleLatest
:读书处的数据进行限制,按照指定的参数对时间进行分片,然后选择每个时间片中的最近数据
1 2 3 4 5 6 7 8 9 10 public final Observable<T> throttleFirst (long skipDuration, TimeUnit unit, Scheduler scheduler) Observable.just (1 ,2 ,3 ,4 ,5 ,6 ,5 ) .delay (50 ,TimeUnit.MILLISECONDS,Schedulers.trampoline() ). throttleFirst (100 ,TimeUnit.MILLISECONDS) => 1 ,3 ,5 ,5 public final Observable<T> throttleLast (long skipDuration, TimeUnit unit, Scheduler scheduler) Observable.just (1 ,2 ,3 ,4 ,5 ,6 ,5 ) .delay (50 ,TimeUnit.MILLISECONDS,Schedulers.trampoline() ). throttleLast (100 ,TimeUnit.MILLISECONDS) => 2 ,4 ,6 ,5 public final Observable<T> throttleLatest (long skipDuration, TimeUnit unit, Scheduler scheduler) Observable.just (1 ,2 ,3 ,4 ,5 ,6 ,5 ) .delay (50 ,TimeUnit.MILLISECONDS,Schedulers.trampoline() ). throttleLatest (100 ,TimeUnit.MILLISECONDS) => 1 ,3 ,5 ,6
throttleWithTomeOut
数据源发射数据时,如果两次数据的发射间隔小于指定时间,就会丢弃前一次的数据,直到指定时间内都没有新数据发射时才进行发射
1 2 3 4 public final Observable<T> throttleWithTimeout (long timeout, TimeUnit unit, Scheduler scheduler) Observable.just (1 ,2 ,3 ,4 ,5 ,6 ,5 ) .delay (120 ,TimeUnit.MILLISECONDS,Schedulers.trampoline() ). throttleWithTimeout (200 ,TimeUnit.MILLISECONDS) => 5
debounce
限制发射频率过快的,它仅在过了一段指定的时间还没发射数据时才发射一个数据。功能与throttleWithTimeout
相似
1 2 3 4 5 6 7 public final Observable<T> throttleWithTimeout (long timeout, TimeUnit unit) { return debounce(timeout, unit); } public final Observable<T> debounce (long timeout, TimeUnit unit) { return debounce(timeout, unit, Schedulers.computation()); }
组合操作符 startWith&startWithArray
startWith
:在指定数据源之前插入数据
startWithArray
:在指定数据之前插入数组
1 2 3 4 5 6 7 8 9 10 public final Observable<T> startWith (T item) public final Observable<T> startWith (Iterable<? extends T> items) Observable.just (1 ,2 ,3 ,4 ) .startWith (0 ) => 0 ,1 ,2 ,3 ,4 public final Observable<T> startWithArray (T... items) Observable.just (1 ,2 ,3 ,4 ) .startWithArray (0 ,0 ,0 ) => 0 ,0 ,0 ,1 ,2 ,3 ,4
merge&mergeArray
merge
:可以将多个数据源的数据合并起来进行发射,可能导致合并后的数据交错发射。
mergeArray
:插入的是一个Observable
数组
1 2 3 4 5 6 public static <T> Observable<T> merge (Iterable<? extends ObservableSource<? extends T>> sources) Observable<String > ob1 = Observable.just(1 ,2 ,3 ,4 ); Observable<String > ob2 = Observable.just(1 ,2 ,3 ,4 ); Observable.mergeArray(ob1,ob2)
mergeError
是一个特殊示例,会等到所有数据发射完毕,才抛出onError
事件
concat&concatArray
会将多个Observable
对象合并到一个Observable
对象中进行发送且严格按照顺序发射 。
{% fullimage /images/RxJava-concat.webp,conact操作符,conact操作符%}
1 2 3 Observable<String > ob1 = Observable.just(1 ,2 ,3 ,4 ); Observable<String > ob2 = Observable.just(1 ,2 ,3 ,4 ); Observable.concatArray(ob1,ob2)
zip&zipArray&zipIterable
按顺序结合两个或多个Observable
对象,然后返回结果。严格按照顺序进行发射,他的总发射数与数据项最少的Observable
对象数据相同。
1 2 3 Observable.zip(Observable.range(1 , 6 ), Observable.range(1 , 5 ),(integer, integer2) -> integer*integer2).subscribe(o -> System.err.println(o));1 ,4 ,9 ,16 ,25
{% fullimage /images/RxJava-zip.webp,zip操作符,zip操作符%}
combineLastest
当两个Observable中的任何一个发送了数据时,使用一个函数结合每个Observable
的最近数据项,并且基于这个结果发送数据。
{% fullimage /images/RxJava-combineLatest.png,combineLatest操作符,combineLatest操作符%}
1 2 3 4 Observable.combineLatest(Observable.range(1 , 6 ),Observable.range(1 ,5 ),(integer, integer2) -> integer*integer2).subscribe(o -> System.err.println(o));6 ,12 ,18 ,24 ,32
辅助操作符 delay
设置数据发射前的停顿时间
1 2 3 public final Observable<T> delay (long delay, TimeUnit unit) Observable.just (1 ,2 ,3 ,4 ) .delay (200 ,TimeUnit.MILLISECONDS)
do系列
用于监听数据回调
doAfterNext
:在onNext()
之后回调
doOnNext
:调用onNext()
时回调
doOnSubscribe
:观察者订阅时触发
doOnError
:调用onError()
时触发
doOnComplete
:调用onComplete()
时触发
doOnTerminate
:Observable终止前触发
doOnEach
:每次调用onNext()
时触发
doOnDispose
:调用Disposable.dispose()
时触发
doFinally
:调用onError()
或onComplete()
时触发
doAfterTerminate
:Observable终止后触发
subscribeOn
指定发射器所在的线程,即Observable.onScuscribe
的所处线程,或者叫做事件产生的线程
1 Observable.just(1 ).subscribeOn(Schedules.io)
observeOn
指定Subscriber所在线程,或者叫做事件消费的线程。
1 Observable.just(1 ).subscribeOn(Schedules.io).obServerOn(AndroidSchedulers.mainThread())
timeout
用来设置一个超时时间,如果在指定时间内没有任何数据被发送出来,就会执行指定的数据项。
1 2 3 4 5 6 7 Observable.just(1L ,2L ,3L ) .timeout(500 , TimeUnit.MILLISECONDS, Observable.rangeLong(1 , 5 )) .subscribe(System.out::print); 输出结果:123 Thread.sleep(2000 ); 输出结果:12345
错误处理操作符 catch
拦截原始的onError
通知,把它进行替换为其他的Observable对象,使原来的可以正常终止甚至不终止。
retry
实现了一种错误重试机制,再出现错误的时候进行重试,可以通过参数指定重试的条件。
{% fullimage /images/RxJava-retry.webp,retry操作符,retry操作符%}
条件操作符 all&any
all
:判断数据源中是否所有数据都满足指定要求,可以使用一个函数表示
any
:判断数据源中是否存在满足要求的数据
1 2 3 4 5 public final Single<Boolean> all (Predicate<? super T> predicate) Observable.just (1 ,2 ,3 ,4 ) .all (integer -> integer < 5 ) => true public final Single<Boolean> any (Predicate<? super T> predicate) Observable.just (1 ,2 ,3 ,4 ) .all (integer -> integer < 2 ) => true
contains&isEmpty
contains
:判断数据源中是否包含指定项
isEmpty
:数据源是否为空
1 2 3 4 5 public final Single<Boolean> contains (final Object element) Observable.just (1 ,2 ,3 ,4 ) .contains (2 ) => true public final Single<Boolean> isEmpty () Observable.just (1 ,2 ,3 ,4 ) .isEmpty () => false
sequenceEqual
用来判断两个Observable对象的发射序列是否相等(包含数据,发射顺序,终止状态等 )
1 2 3 4 5 6 7 8 9 10 11 12 public static <T> Single<Boolean> sequenceEqual (ObservableSource<? extends T> source1, ObservableSource<? extends T> source2) Observable.sequenceEqual (Observable.range(1 ,6 ) , Observable.range (1 ,5 ) ) .subscribe (new Consumer<Boolean>() { @Override public void accept (Boolean aBoolean) throws Exception { System.err.println(aBoolean); } }); 输出结果:false
amb
作用于两个或多个Observable
对象,但是只会发射最初的Observable对象数据。
1 2 3 4 Observable.amb(Arrays.asList(Observable.range(1 ,6 ), Observable.range(1 ,5 ))) 输出结果: 1 2 3 4 5 6
defaultIfEmpty
当数据源的数据为空时指定发送一个数据
1 2 3 4 5 Observable.create(observableEmitter -> { observableEmitter.onComplete(); }).defaultIfEmpty(777 ) 输出结果: 777
转换操作符 toList&toSortedList
将数据发射序列转成列表,toSortedList
转成升序列表
1 2 3 4 5 6 7 8 9 public final Single<List<T>> toList() Observable.just(1 ,2 ,5 ,6 ,4 ,3 ).toList() => 1 ,2 ,5 ,6 ,4 ,3 public final Single<List<T>> toSortedList() Observable.just(1 ,2 ,5 ,6 ,4 ,3 ).toSortedList() => 1 ,2 ,3 ,4 ,5 ,6 public final Single<List<T>> toSortedList(final Comparator<? super T> comparator) Observable.just(1 ,2 ,5 ,6 ,4 ,3 ).toSortedList(Comparator.comparingInt(value -> -value)) => 6 ,5 ,4 ,3 ,2 ,1
toMap&toMultiMap
将发射的数据转换成另一个类型的值,转换过程是针对每一个数据项的。源Observable的每一项数据作为value,然后用户自定义生成key的函数,最终得到Map型的结果。
toMultiMap
可以转换成一个集合对象
{% fullimage /images/RxJava-toMap.webp,toMap操作符,toMap操作符%}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 Observable.just(1 ,2 ,5 ) .toMap(new Function<Integer, Object>() { @Override public Object apply (Integer integer) throws Exception { return integer + 1 ; } }) 输出结果:[2 =1 ,3 =2 ,6 =5 ] Observable.just(1 ,2 ,3 ) .toMultimap(new Function<Integer, Object>() { @Override public Object apply (Integer integer) throws Exception { return integer+1 ; } }) 输出结果:{2 =[1 ], 3 =[2 ], 4 =[3 ]}
toFlowable
用于讲一个Observable
对象转成Flowable
对象。
to
可以将一个Observable
对象转成任意类型
RxJava线程类型 RxJava的线程控制是通过subscribeOn
以及observeOn
来进行控制的。接下来列举一下RxJava中提供的线程调度器以及RxAndroid
提供的主线程调度器。
Schedulers.io()
:适用于io操作(在Android中指代子线程 ),增长或缩减来自适应的线程池,通常用于访问网络,读写文件等操作。采用的线程池是CachedThreadPool
,是无限制的,但是大量创建线程就会影响性能。
Schedulers.computation()
:计算工作默认的调度器,与I/O操作无关
Schedulers.newThread()
:代表常规的一个新线程
Schedulers.immdiate()
:允许你立即在当前线程执行任务
Schedulers.trampoline()
:当我们想执行一个任务时,并非立即执行,使用trampoline()
执行入队。将会处理它的队列并且按序运行队列中每一个任务
AndroidSchedulers.mainThread()
:指代Android系统中的主线程
RxJava基本流程分析 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe (ObservableEmitter<Integer> observableEmitter) throws Exception { observableEmitter.onNext(123 ); observableEmitter.onComplete(); } }).subscribeOn(AndroidSchedulers.mainThread()) .observerOn(Schedulers.io()) .subscribe(new Observer<Integer>() { @Override public void onSubscribe (Disposable disposable) { } @Override public void onNext (Integer integer) { System.err.println(integer); } @Override public void onError (Throwable throwable) { } @Override public void onComplete () { } });
以上是最基础的RxJava使用流程,从这段代码中进行原理分析。
创建Observable 从Observable.create()
开始
1 2 3 4 5 public static <T> Observable<T> create (ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null" ); return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); }
Observable先调用create()
create()
中传入ObservableOnSubscribe
对象
通过new ObservableCreate()
得到最终Observable对象
ObservableCreate.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 final ObservableOnSubscribe<T> source; public ObservableCreate (ObservableOnSubscribe<T> source) { this .source = source; } @Override protected void subscribeActual (Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } }
{% fullimage /images/Observable创建流程.png,Observable创建流程,Observable创建流程%}
Observable订阅 通过Observable.subscribe()
执行订阅流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public final void subscribe (Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null" ); try { observer = RxJavaPlugins.onSubscribe(this , observer); ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins" ); subscribeActual(observer);① } catch (NullPointerException e) { throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS" ); npe.initCause(e); throw npe; } }protected abstract void subscribeActual (Observer<? super T> observer) ;protected void subscribeActual (Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } }
在subscribe()
中会调用到抽象方法subscribeActual()
,就拿上节的ObservableCreate
来说,他的内部就实现了subscribeActual()
。
在subscribe()
传入的参数为Observer
类型即观察者。
Observable.subscribe()
调用到Observable.subscribeActual()
,由子类进行覆盖重写(多态 ),在其中执行的代码主要分为两部分:
Observerable.subscribe
:真正的订阅流程从这里开始
Observer.onSubscribe
:在其中进行事件分发
线程切换 RxJava通过subscribeOn()
和observerOn()
来进行线程的切换。如果未去设置线程切换相关参数,那么执行的线程都会为当前线程。
Observable.observeOn
切换subscribe()
的运行线程
从Observable.observeOn()
开始进行分析
1 2 3 4 5 6 7 8 9 10 public final Observable<T> observeOn (Scheduler scheduler) { return observeOn(scheduler, false , bufferSize()); }public final Observable<T> observeOn (Scheduler scheduler, boolean delayError, int bufferSize) { ObjectHelper.requireNonNull(scheduler, "scheduler is null" ); ObjectHelper.verifyPositive(bufferSize, "bufferSize" ); return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this , scheduler, delayError, bufferSize)); }
通过Observable.observeOn()
运行之后得到了ObservableObserveOn
对象。
ObservableObserveOn.java 1 2 3 4 5 6 7 8 9 10 11 @Override protected void subscribeActual (Observer<? super T> observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); } }
调用ObservableObserveOn.subscribeActual()
后,会生成ObserveOnObserver
对象,由该对象对ObserveableObserveOn
进行监听
ObservableObserveOn.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Override public void onNext (T t) { if (done) { return ; } if (sourceMode != QueueDisposable.ASYNC) { queue.offer(t); } schedule(); } void schedule () { if (getAndIncrement() == 0 ) { worker.schedule(this ); } }
例如发送一个onNext()
事件,就会执行到schedule()
进行线程切换(如果设置)。worker
对应新建ObserveOnObserve
对象时的传入参数——用户自定义的Schedulers
。此处拿IoScheduler
举例,用于子线程执行操作。
IoScheduler.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public abstract static class Worker implements Disposable @NonNull public Disposable schedule (@NonNull Runnable run ) { return schedule(run, 0L , TimeUnit.NANOSECONDS); } }static final class EventLoopWorker extends Scheduler .Worker {@NonNull @Override public Disposable schedule (@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) { if (tasks.isDisposed()) { return EmptyDisposable.INSTANCE; } return threadWorker.scheduleActual(action, delayTime, unit, tasks); } } static final class ThreadWorker extends NewThreadWorker { private long expirationTime; ThreadWorker(ThreadFactory threadFactory) { super (threadFactory); this .expirationTime = 0L ; } public long getExpirationTime () { return expirationTime; } public void setExpirationTime (long expirationTime) { this .expirationTime = expirationTime; } }
假设设置子线程执行即Schedulers.io()
,那么会优先调用到Schedulers.schedule()
由于子类没有实现该方法,所以上溯到父类中的实现。
调用到schedule()
之后,,就会调用到IoScheduler.schedule()
中,再切换到了NewThreadWorker.scheduleActual()
中
NewThreadWorker.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public ScheduledRunnable scheduleActual (final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) { Runnable decoratedRun = RxJavaPlugins.onSchedule(run); ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent); if (parent != null ) { if (!parent.add(sr)) { return sr; } } Future<?> f; try { if (delayTime <= 0 ) { f = executor.submit((Callable<Object>)sr); } else { f = executor.schedule((Callable<Object>)sr, delayTime, unit); } sr.setFuture(f); } catch (RejectedExecutionException ex) { if (parent != null ) { parent.remove(sr); } RxJavaPlugins.onError(ex); } return sr; }
在NewThreadWorker.scheduleActual()
主要执行的是使用线程池调用submit()
或execute()
去启动线程。经过层层传递最后调用到ObserveableObserveOn.run()
ObserveableObserveOn.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @Override public void run () { if (outputFused) { drainFused(); } else { drainNormal(); } }void drainNormal () { ... for (;;) { if (checkTerminated(done, q.isEmpty(), a)) { return ; } for (;;) { boolean d = done; ... a.onNext(v); } ... } }
层层回调到run()
后,在其中会继续执行onNext()
此时线程已经切换成功。
最终实现的是将目标Observe中的onNext(),onError(),onComplete()
置于指定线程中运行。
//TODO 流程图
Observable.subscribeOn
切换Observable
的运行线程
同上,先从Observable.subscribeOn()
开始分析
Observable.java 1 2 3 4 public final Observable<T> subscribeOn (Scheduler scheduler) { ObjectHelper.requireNonNull(scheduler, "scheduler is null" ); return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this , scheduler)); }
操作的对象为ObservableSubscribeOn
ObservableSubscribeOn.java 1 2 3 4 5 6 7 8 @Override public void subscribeActual (final Observer<? super T> observer) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer); observer.onSubscribe(parent); parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); }
这里我们假设使用的是IoScheduler
,就调用到IoScheduler.scheduleDirect()
Scheduler.java 1 2 3 4 5 6 7 8 9 10 11 12 public Disposable scheduleDirect (@NonNull Runnable run) { return scheduleDirect(run, 0L , TimeUnit.NANOSECONDS); } @NonNull public Disposable scheduleDirect (@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { final Worker w = createWorker(); final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); DisposeTask task = new DisposeTask(decoratedRun, w); w.schedule(task, delay, unit); return task; }
最后调用到DisposeTask.run()
方法去执行切换
Scheduler.java 1 2 3 4 5 6 7 8 9 10 11 static final class DisposeTask implements Disposable , Runnable , SchedulerRunnableIntrospection {public void run () { runner = Thread.currentThread(); try { decoratedRun.run(); } finally { dispose(); runner = null ; } } }
decoratedRun
指的就是上面传进来的SubscribeTask
,有执行到了它的run()
ObservableSubscribeOn.java 1 2 3 4 5 6 7 8 9 10 11 12 final class SubscribeTask implements Runnable { private final SubscribeOnObserver<T> parent; SubscribeTask(SubscribeOnObserver<T> parent) { this .parent = parent; } @Override public void run () { source.subscribe(parent); } }
此时线程已经完成了切换工作。
第一次有效。
AndroidSchedulers.mainThread()
由RxAndroid提供的Android主线程切换器
AndroidSchedulers.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 private static final class MainHolder { static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()), false ); }private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler( new Callable<Scheduler>() { @Override public Scheduler call () throws Exception { return MainHolder.DEFAULT; } });public static Scheduler mainThread () { return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD); }public static Scheduler from (Looper looper) { return from(looper, false ); }
mainThread()
里面新建了一个Handler
对象用来切换至主线程,还支持通过from()
设置自定义Looper来切换到其他线程。
RXJava设置线程切换时,需要通过Scheduler.createWorker()
来生成对应线程切换器
HadnlerScheduler.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 final class HandlerScheduler extends Scheduler { @Override @SuppressLint ("NewApi" ) public Disposable scheduleDirect (Runnable run, long delay, TimeUnit unit) { if (run == null ) throw new NullPointerException("run == null" ); if (unit == null ) throw new NullPointerException("unit == null" ); run = RxJavaPlugins.onSchedule(run); ScheduledRunnable scheduled = new ScheduledRunnable(handler, run); Message message = Message.obtain(handler, scheduled); if (async) { message.setAsynchronous(true ); } handler.sendMessageDelayed(message, unit.toMillis(delay)); return scheduled; } @Override public Worker createWorker () { return new HandlerWorker(handler, async); } }private static final class HandlerWorker extends Worker { public Disposable schedule (Runnable run, long delay, TimeUnit unit) { ScheduledRunnable scheduled = new ScheduledRunnable(handler, run); Message message = Message.obtain(handler, scheduled); message.obj = this ; if (async) { message.setAsynchronous(true ); } handler.sendMessageDelayed(message, unit.toMillis(delay)); if (disposed) { handler.removeCallbacks(scheduled); return Disposables.disposed(); } } @Override public void dispose () { disposed = true ; handler.removeCallbacksAndMessages(this ); } }
在dispose()
时通过移除对应的消息来取消订阅
事件分发 取消订阅 通过Disposable.dispose()
可以取消相关订阅
内容引用 RxJava1
RxJava2
RxJava3
RxJava4
RxJava5
RxJava6