Rxjava2调用链线程切换解析
一、Rxjava2的用法
用法很简单,但源码有点复杂,这里为了模拟多次subscribeOn和observeOn,给Observable这个类新增了两个方法和两个类,便于分析调试。
Q1: Rxjava的链式调用怎么实现的?
不看源码,还真不知道,以为链式调用肯定通过Builder设计模式实现,其实Rxjava不是,仅仅Observable这个类就有15000+的代码。链式调用采用装饰器设计模式实现,除了最后的订阅,每次链式调用一次,都套娃一次。 我们上面的Observable最后一次链式调用是ObserveOn2,那最终的Observable就是ObservableObserveOn2。将ObservableOnSubscribe称之为伪Observable,因其就只有一个subscribe方法,跟其他Observable不一样,ObservableOnSubscribe的subscribe方法可用于发射器发射数据(调用观察者observer的onNext、onComplete、onError)等。
public interface ObservableOnSubscribe<T> { void subscribe(@NonNull ObservableEmitter<T> emitter) throws Throwable; } //from Observable public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { //这里的source就是伪Observable return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); } 复制代码
ps: 都知道Observable套娃了,那它对应的observer套娃了吗?也套娃了。
二、用户Observer的OnSubscribe方法调用流程
2.1、订阅触发-->ObservableObserveOn2.subscribe(observer)
observable就是ObservableObserveOn2,observeOn2的调用是切换到主线程。 先看其subscribe方法。
public final void subscribe(Observer<? super T> observer) { ..... subscribeActual(observer); ..... } } @Override protected void subscribeActual(Observer<? super T> observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); //当前线程模式 } else //我们这里是AndroidSchedulers.mainThread()--->安卓主线程模式 Scheduler.Worker w = scheduler.createWorker() source.subscribe(new ObserveOnObserver2<T>(observer, w, delayError, bufferSize)); } } 复制代码
参数observer就是用户observer,进来之后,被ObserveOnObserver2封装了下,而且把调度器、buffersize(128)也加进去了。这里切换安卓主线程的功能是来自JakeWharton的RxAndroid库(专为Rxjava适配安卓用),先看下HandlerWorker,因为面试会问,所以要讲讲。
2.1.1、Rxjava是如何切换到主线程的?
public static Scheduler mainThread() { return new HandlerScheduler(new Handler(Looper.getMainLooper()), false); } final class HandlerScheduler extends Scheduler { ..... public Worker createWorker() { return new HandlerWorker(handler, async); } } private static final class HandlerWorker extends Worker { public Disposable schedule(Runnable run, long delay, TimeUnit unit) { ...... ScheduledRunnable scheduled = new ScheduledRunnable(handler, run); Message message = Message.obtain(handler, scheduled); message.obj = this; handler.sendMessageDelayed(message, unit.toMillis(delay)); if (disposed) { handler.removeCallbacks(scheduled); return Disposables.disposed(); } return scheduled; } @Override public void dispose() { disposed = true; handler.removeCallbacksAndMessages(this /* token */); } ..... } 复制代码
兄弟们,这里线程切换就是用的handler。但是Runnalb是谁,不用猜,肯定是ObserveOnObserver2,这样就可以在他的onNext、onComplete、onError先经过Handler的切换线程,然后分发给用户observer。
static final class ObserveOnObserver2<T> implements Observer<T>, Runnable { final Observer<? super T> downstream; ...... ObserveOnObserver2(Observer<? super T> actual, Scheduler.Worker worker, int bufferSize) { this.downstream = actual; ..... } @Override public void onSubscribe(Disposable d) { //没有这个调用worker.schedule(this);方法 } @Override public void onNext(T t) { ..... worker.schedule(this); } @Override public void onError(Throwable t) { ..... worker.schedule(this); } @Override public void onComplete() { .... worker.schedule(this); } @Override public void run() { if (outputFused) { drainFused(); } else { drainNormal(); } } void drainNormal() { ...... T v = q.poll(); downstream.onNext(v); } } 复制代码
当onNext被触发的时候,worker.schedule(this),执行线程切换,然后handler处理message时,就走run方法,这里会走drainNormal(测试时发现,连续多个observeOn调用,前面的几个observeOn都是走drainFused,只有最后一个drainNormal),然后下发给下游observer,这里就是用户observer,所以用户observer的onNext、onComplete、onNext方法所在的线程,只与距离它最近的observeOn方法中设置的线程有关。 但是用户observer的onSubscribe方法所在的线程在哪里决定?后面告知。
2.1.2、继续ObservableObserveOn2的subscribeActual
2.2、ObservableMap.subscribe(observer)
同样也是先 subscribe(observer) --> subscribeActual(observer)
map主要的操作
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> { final Function<? super T, ? extends U> mapper; MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) { super(actual); this.mapper = mapper; } @Override public void onNext(T t) { U v = mapper.apply(t) downstream.onNext(v); } ..... } 复制代码
map的操作就太简单了,将apply方法的返回值作为onNext链中的新值,往下传递。
2.3、ObservableObserveOn.subscribe(observer)
ObserveOnObserver构造传入的Worker是IoScheduler的EventLoopWorker。不用太care,io线程池,还能有啥,一般都是核心线程数量和max的线程数量一样,都等于cpu的数量即可。但是Rxjava不是这么处理的,稍复杂,这里不讨论。就只看成线程池提交任务处理即可。ps:线程池处理的是其下游的observer的onNext、onComplete、onError,不是当前ObserveOnObserver的。 与之前说的:用户observer的onNext由距离它最近的observeOn设置的线程决定,同理。
2.4、ObservableSubscribeOn2.subscribe(observer)
现在转到subscribeOn了,跟之前observeOn不一样。 先看subscribeActual方法中observer.onSubscribe(parent),这里observer是ObserveOnObserver 这么快这个下游的onSubscribe被执行了,一直以为是从最里层ObservableCreate方法的subscribeActual调用Observer的onSubscribe开始呢,实际上距离ObserveOn最近的一个subscribeOn就开始调用下游的onSubscribe了。
2.4.1、ObserveOnObserver.onSubscribe
@Override public void onSubscribe(Disposable d) { //d就是SubscribeOnObserver2 if (DisposableHelper.validate(this.upstream, d)) { this.upstream = d; if (d instanceof QueueDisposable) { ...... return } queue = new SpscLinkedArrayQueue<T>(bufferSize); //继续调用下游的onSubscribe downstream.onSubscribe(this); } } 复制代码
这里的d就是SubscribeOnObserver2,他不是QueueDisposable类型,所以这里会继续调用下游的onSubscribe。ObserveOnObserver的下游是MapObserver。
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> { public abstract class BasicFuseableObserver<T, R> implements Observer<T>, QueueDisposable<R> { 复制代码
// from MapObserver public final void onSubscribe(Disposable d) { if (DisposableHelper.validate(this.upstream, d)) { this.upstream = d; if (d instanceof QueueDisposable) { this.qd = (QueueDisposable<T>)d; } downstream.onSubscribe(this) } } 复制代码
这里MapObserver是个QueueDisposable,也会调用下游的onSusbcribe,MapObserver的下游是ObserveOnObserver2。
// from ObserveOnObserver2 @Override public void onSubscribe(Disposable d) { //d就是SubscribeOnObserver2 if (DisposableHelper.validate(this.upstream, d)) { this.upstream = d; if (d instanceof QueueDisposable) { ...... return } queue = new SpscLinkedArrayQueue<T>(bufferSize); //继续调用下游的onSubscribe downstream.onSubscribe(this); } } 复制代码
这里ObserveOnObserver2就调用用户observer的onSubscribe了。
总结一下用户observer的onSusbcribe的调用过程 那用户observer的onSubscribe回调了,请问它处在哪个线程?
就是Rxjava发起订阅所在的线程。
三、用户Observer的onNext方法调用流程
3.1、继续从ObservableSubscribeOn2的subscribeActual开始
final class SubscribeTask implements Runnable { private final SubscribeOnObserver2<T> parent; SubscribeTask(SubscribeOnObserver2<T> parent) { this.parent = parent; } @Override public void run() { source.subscribe(parent); } } public Disposable scheduleDirect(@NonNull Runnable run) { final Worker w = new EventLoopWorker(pool.get()) w.schedule(run, 0, TimeUnit.NANOSECONDS); return task; } 复制代码
又看到了source.subscribe(parent),此时说明subscribeOn2(schedulers.IO)决定了 ObservableSubscribeOn.subscribe(subscribeOnObserver2)方法执行的线程 ,真的是厉害哦。
3.2、ObservableSubscribeOn的subscribe(observer)
从subscribe(observer)---->subscribeActual(observer)
observer.onSubscribe(parent),表示subscribeOnObserver2的onSubscribe被调用。 就只是给subscribeOnObserver2的upstream字段设置值为subscribeOnObserver,所以subscribeOnObserver2的上游是subscribeOnObserver,下游是从构造传入的,为observeOnObserver。
final class SubscribeTask implements Runnable { private final SubscribeOnObserver2<T> parent; SubscribeTask(SubscribeOnObserver2<T> parent) { this.parent = parent; } @Override public void run() { source.subscribe(parent); } } public Disposable scheduleDirect(@NonNull Runnable run) { final Worker w = new EventLoopWorker(pool.get().getEventLoop()); w.schedule(run, 0, TimeUnit.NANOSECONDS); return task; } 复制代码
这里的Schedulers.computation()就简单看成是computation线程池执行任务source.subscribe(parent) 就行, 表明subscribeOn(Schedulers.computation())决定了
ObservableCreate.subscribe(subscribeOnObserver)方法执行的线程。
3.3、ObservableCreate.subscribe(observer)
从subscribe(observer)---->subscribeActual(observer) 使用subscribeOnObserver创建了发射器CreateEmitter。
subscribeOnObserver调用onSubscribe(parent),设置上游observer为createEmitter
source.subscribe(parent),这里的source就是那个伪Observable(ObservableOnSubscribe)。 这里终于看到了emitter的onNext调用,前面提到Schedulers.computation()决定了这里的subscribe方法执行的线程。
3.4、CreateEmitter.onNext方法
@Override public void onNext(T t) { if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } if (!isDisposed()) { observer.onNext(t); } } 复制代码
observer.onNext直接就是subscribeOnObserver的onNext调用了
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable { @Override public void onNext(T t) { downstream.onNext(t); } ..... } 复制代码
downstream.onNext直接就是subscribeOnObserver2的onNext调用了
static final class SubscribeOnObserver2<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable @Override public void onNext(T t) { downstream.onNext(t); } ..... } 复制代码
downstream.onNext直接就是observeOnObserver的onNext调用了,onNext此时所在的线程还是computation线程。
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable { @Override public void onNext(T t) { ...... if (sourceMode != QueueDisposable.ASYNC) { queue.offer(t); //将数据放到队列 } if (getAndIncrement() == 0) worker.schedule(this); } } @Override public void run() { if (outputFused) { drainFused() } else { drainNormal(); } } void drainNormal() { ..... T v = q.poll(); downstream.onNext(v); } } 复制代码
同样也是将数据往下游分发,observeOnObserver下游的mapObserver, 它的onNext将会在IO线程。
mapObserver的onNext只是将数据做个简单的处理,然后继续下游分发数据。
@Override public void onNext(T t) { ...... U v = mapper.apply(t) downstream.onNext(v); } 复制代码
mapObserver的下游observeOnObserver2的onNext方法继续分发数据。
static final class ObserveOnObserver2<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable { @Override public void onNext(T t) { ...... if (sourceMode != QueueDisposable.ASYNC) { queue.offer(t); //将数据放到队列 } if (getAndIncrement() == 0) worker.schedule(this); } } @Override public void run() { if (outputFused) { drainFused() } else { drainNormal(); } } void drainNormal() { ..... T v = q.poll(); downstream.onNext(v); } } 复制代码
跟之前的observeOnObserver一样代码,此时observeOnObserver2的onNext跟MapObserver的onNext是在同一个线程,都是在io线程执行,observeOnObserver2的下游就是用户observer了,在这里经过主线程代码切换后,用户observer的onNext的方法就在主线程成执行了。
四、分析线程
有人说多次subscribeOn,只有第一次生效,其实每次都生效。
每次subscribeOn设置线程都会对它source Observable的subscribe方法(或者subscribeActual)产生影响。
每次observeOn设置线程都会对它下游的Observer的onNext、onComplete、onError产生影响。
被观察者 | subscribe方法 | 观察者 | onSubscribe方法 | onNext/onComplete/onError方法 |
---|---|---|---|---|
伪Observable | computation线程 | CreateEmitter | - | computation线程 |
ObservableCreate | computation线程 | CreateEmitter | - | computation线程 |
ObservableSubscribeOn | io线程 | SubscribeOnObserver | computation线程 | computation线程 |
ObservableSubscribeOn2 | 用户线程 | SubscribeOnObserver2 | io线程 | computation线程 |
ObservableObserveOn | 用户线程 | ObserveOnObserver | 用户线程 | computation线程 |
ObservableMap | 用户线程 | MapObserver | 用户线程 | io线程 |
ObservableObserveOn2 | 用户线程 | ObserveOnObserver2 | 用户线程 | io线程 |
- | - | 用户observer | 用户线程 | 主线程 |
ps:用户线程就是指observable.subscribe(observer)代码调用的线程。 CreateEmitter不是观者者,没有继承Observer,它只是含有onNext、onComplete、onError方法而已。
作者:信仰年轻
链接:https://juejin.cn/post/7015609207470686221