阅读 117

Rxjava2调用链线程切换解析

一、Rxjava2的用法

image.png 用法很简单,但源码有点复杂,这里为了模拟多次subscribeOn和observeOn,给Observable这个类新增了两个方法和两个类,便于分析调试。 image.png image.png

image.png image.png

  • Q1: Rxjava的链式调用怎么实现的?

不看源码,还真不知道,以为链式调用肯定通过Builder设计模式实现,其实Rxjava不是,仅仅Observable这个类就有15000+的代码。链式调用采用装饰器设计模式实现,除了最后的订阅,每次链式调用一次,都套娃一次。 我们上面的Observable最后一次链式调用是ObserveOn2,那最终的Observable就是ObservableObserveOn2。将ObservableOnSubscribe称之为伪Observable,因其就只有一个subscribe方法,跟其他Observable不一样,ObservableOnSubscribe的subscribe方法可用于发射器发射数据(调用观察者observer的onNext、onComplete、onError)等。 image.png

public interface ObservableOnSubscribe<T> {     void subscribe(@NonNull ObservableEmitter<T> emitter) throws Throwable; } //from Observable public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {     //这里的source就是伪Observable     return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); } 复制代码

image.png

ps: 都知道Observable套娃了,那它对应的observer套娃了吗?也套娃了。 image.png

二、用户Observer的OnSubscribe方法调用流程

image.png

  • 2.1、订阅触发-->ObservableObserveOn2.subscribe(observer)

observable就是ObservableObserveOn2,observeOn2的调用是切换到主线程。 image.png 先看其subscribe方法。

public final void subscribe(Observer<? super T> observer) {         .....         subscribeActual(observer);         .....     } } @Override protected void subscribeActual(Observer<? super T> observer) {     if (scheduler instanceof TrampolineScheduler) {         source.subscribe(observer); //当前线程模式     } else  //我们这里是AndroidSchedulers.mainThread()--->安卓主线程模式         Scheduler.Worker w = scheduler.createWorker()         source.subscribe(new ObserveOnObserver2<T>(observer, w, delayError, bufferSize));     } } 复制代码

参数observer就是用户observer,进来之后,被ObserveOnObserver2封装了下,而且把调度器、buffersize(128)也加进去了。这里切换安卓主线程的功能是来自JakeWharton的RxAndroid库(专为Rxjava适配安卓用),先看下HandlerWorker,因为面试会问,所以要讲讲

  • 2.1.1、Rxjava是如何切换到主线程的?

public static Scheduler mainThread() {     return new HandlerScheduler(new Handler(Looper.getMainLooper()), false); } final class HandlerScheduler extends Scheduler {    .....     public Worker createWorker() {         return new HandlerWorker(handler, async);     } } private static final class HandlerWorker extends Worker {     public Disposable schedule(Runnable run, long delay, TimeUnit unit) {         ......         ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);         Message message = Message.obtain(handler, scheduled);         message.obj = this;          handler.sendMessageDelayed(message, unit.toMillis(delay));         if (disposed) {             handler.removeCallbacks(scheduled);             return Disposables.disposed();         }         return scheduled;     }     @Override     public void dispose() {         disposed = true;         handler.removeCallbacksAndMessages(this /* token */);     }     ..... } 复制代码

兄弟们,这里线程切换就是用的handler。但是Runnalb是谁,不用猜,肯定是ObserveOnObserver2,这样就可以在他的onNext、onComplete、onError先经过Handler的切换线程,然后分发给用户observer。

static final class ObserveOnObserver2<T> implements Observer<T>, Runnable {     final Observer<? super T> downstream;     ......     ObserveOnObserver2(Observer<? super T> actual, Scheduler.Worker worker, int bufferSize) {         this.downstream = actual;         .....     }     @Override     public void onSubscribe(Disposable d) {        //没有这个调用worker.schedule(this);方法     }     @Override     public void onNext(T t) {         .....         worker.schedule(this);     }     @Override     public void onError(Throwable t) {         .....        worker.schedule(this);     }     @Override     public void onComplete() {          ....         worker.schedule(this);     }      @Override      public void run() {         if (outputFused) {             drainFused();         } else {             drainNormal();         }      }      void drainNormal() {         ......         T v  = q.poll();         downstream.onNext(v);      } } 复制代码

当onNext被触发的时候,worker.schedule(this),执行线程切换,然后handler处理message时,就走run方法,这里会走drainNormal(测试时发现,连续多个observeOn调用,前面的几个observeOn都是走drainFused,只有最后一个drainNormal),然后下发给下游observer,这里就是用户observer,所以用户observer的onNext、onComplete、onNext方法所在的线程,只与距离它最近的observeOn方法中设置的线程有关。 但是用户observer的onSubscribe方法所在的线程在哪里决定?后面告知。

  • 2.1.2、继续ObservableObserveOn2的subscribeActual

image.png

  • 2.2、ObservableMap.subscribe(observer)

同样也是先 subscribe(observer) --> subscribeActual(observer) image.png

map主要的操作 image.png

static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {     final Function<? super T, ? extends U> mapper;     MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {         super(actual);         this.mapper = mapper;     }     @Override     public void onNext(T t) {         U v = mapper.apply(t)         downstream.onNext(v);     }     ..... } 复制代码

map的操作就太简单了,将apply方法的返回值作为onNext链中的新值,往下传递。

  • 2.3、ObservableObserveOn.subscribe(observer)

image.png image.png ObserveOnObserver构造传入的Worker是IoScheduler的EventLoopWorker。不用太care,io线程池,还能有啥,一般都是核心线程数量和max的线程数量一样,都等于cpu的数量即可。但是Rxjava不是这么处理的,稍复杂,这里不讨论。就只看成线程池提交任务处理即可。ps:线程池处理的是其下游的observer的onNext、onComplete、onError,不是当前ObserveOnObserver的。 与之前说的:用户observer的onNext由距离它最近的observeOn设置的线程决定,同理。

  • 2.4、ObservableSubscribeOn2.subscribe(observer)

image.png 现在转到subscribeOn了,跟之前observeOn不一样。 image.png 先看subscribeActual方法中observer.onSubscribe(parent),这里observer是ObserveOnObserver 这么快这个下游的onSubscribe被执行了,一直以为是从最里层ObservableCreate方法的subscribeActual调用Observer的onSubscribe开始呢,实际上距离ObserveOn最近的一个subscribeOn就开始调用下游的onSubscribe了。

  • 2.4.1、ObserveOnObserver.onSubscribe

@Override public void onSubscribe(Disposable d) {   //d就是SubscribeOnObserver2     if (DisposableHelper.validate(this.upstream, d)) {         this.upstream = d;         if (d instanceof QueueDisposable) {           ......           return          }         queue = new SpscLinkedArrayQueue<T>(bufferSize);         //继续调用下游的onSubscribe         downstream.onSubscribe(this);     } } 复制代码

这里的d就是SubscribeOnObserver2,他不是QueueDisposable类型,所以这里会继续调用下游的onSubscribe。ObserveOnObserver的下游是MapObserver。

static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> { public abstract class BasicFuseableObserver<T, R> implements Observer<T>, QueueDisposable<R> { 复制代码

 // from MapObserver public final void onSubscribe(Disposable d) {     if (DisposableHelper.validate(this.upstream, d)) {         this.upstream = d;         if (d instanceof QueueDisposable) {             this.qd = (QueueDisposable<T>)d;         }        downstream.onSubscribe(this)     } } 复制代码

这里MapObserver是个QueueDisposable,也会调用下游的onSusbcribe,MapObserver的下游是ObserveOnObserver2。

// from  ObserveOnObserver2 @Override public void onSubscribe(Disposable d) {   //d就是SubscribeOnObserver2     if (DisposableHelper.validate(this.upstream, d)) {         this.upstream = d;         if (d instanceof QueueDisposable) {           ......           return          }         queue = new SpscLinkedArrayQueue<T>(bufferSize);         //继续调用下游的onSubscribe         downstream.onSubscribe(this);     } } 复制代码

这里ObserveOnObserver2就调用用户observer的onSubscribe了。

总结一下用户observer的onSusbcribe的调用过程 image.png 那用户observer的onSubscribe回调了,请问它处在哪个线程?
就是Rxjava发起订阅所在的线程。 image.png

三、用户Observer的onNext方法调用流程

  • 3.1、继续从ObservableSubscribeOn2的subscribeActual开始

image.png image.png

final class SubscribeTask implements Runnable {     private final SubscribeOnObserver2<T> parent;     SubscribeTask(SubscribeOnObserver2<T> parent) {         this.parent = parent;     }     @Override     public void run() {         source.subscribe(parent);     } } public Disposable scheduleDirect(@NonNull Runnable run) {     final Worker w = new EventLoopWorker(pool.get())     w.schedule(run, 0, TimeUnit.NANOSECONDS);     return task; } 复制代码

又看到了source.subscribe(parent),此时说明subscribeOn2(schedulers.IO)决定了 ObservableSubscribeOn.subscribe(subscribeOnObserver2)方法执行的线程 ,真的是厉害哦。

  • 3.2、ObservableSubscribeOn的subscribe(observer)

从subscribe(observer)---->subscribeActual(observer) image.png image.png

observer.onSubscribe(parent),表示subscribeOnObserver2的onSubscribe被调用。 image.png 就只是给subscribeOnObserver2的upstream字段设置值为subscribeOnObserver,所以subscribeOnObserver2的上游是subscribeOnObserver,下游是从构造传入的,为observeOnObserver。

final class SubscribeTask implements Runnable {     private final SubscribeOnObserver2<T> parent;     SubscribeTask(SubscribeOnObserver2<T> parent) {         this.parent = parent;     }     @Override     public void run() {         source.subscribe(parent);     } } public Disposable scheduleDirect(@NonNull Runnable run) {     final Worker w = new EventLoopWorker(pool.get().getEventLoop());     w.schedule(run, 0, TimeUnit.NANOSECONDS);     return task; } 复制代码

这里的Schedulers.computation()就简单看成是computation线程池执行任务source.subscribe(parent) 就行, 表明subscribeOn(Schedulers.computation())决定了
ObservableCreate.subscribe(subscribeOnObserver)方法执行的线程

  • 3.3、ObservableCreate.subscribe(observer)

从subscribe(observer)---->subscribeActual(observer) image.png 使用subscribeOnObserver创建了发射器CreateEmitter。
subscribeOnObserver调用onSubscribe(parent),设置上游observer为createEmitter
source.subscribe(parent),这里的source就是那个伪Observable(ObservableOnSubscribe)。 image.png 这里终于看到了emitter的onNext调用,前面提到Schedulers.computation()决定了这里的subscribe方法执行的线程。

  • 3.4、CreateEmitter.onNext方法

@Override public void onNext(T t) {     if (t == null) {         onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));         return;     }     if (!isDisposed()) {         observer.onNext(t);     } } 复制代码

observer.onNext直接就是subscribeOnObserver的onNext调用了

static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {         @Override         public void onNext(T t) {             downstream.onNext(t);            }        ..... } 复制代码

downstream.onNext直接就是subscribeOnObserver2的onNext调用了

static final class SubscribeOnObserver2<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable      @Override     public void onNext(T t) {         downstream.onNext(t);        }        ..... } 复制代码

downstream.onNext直接就是observeOnObserver的onNext调用了,onNext此时所在的线程还是computation线程。

static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable {     @Override     public void onNext(T t) {          ......         if (sourceMode != QueueDisposable.ASYNC) {             queue.offer(t); //将数据放到队列         }        if (getAndIncrement() == 0)              worker.schedule(this);         }     }       @Override     public void run() {         if (outputFused) {             drainFused()         } else {             drainNormal();         }     }     void drainNormal() {         .....          T v = q.poll();          downstream.onNext(v);     } } 复制代码

同样也是将数据往下游分发,observeOnObserver下游的mapObserver, 它的onNext将会在IO线程。 image.png

mapObserver的onNext只是将数据做个简单的处理,然后继续下游分发数据。

@Override public void onNext(T t) {    ......    U v = mapper.apply(t)    downstream.onNext(v); } 复制代码

mapObserver的下游observeOnObserver2的onNext方法继续分发数据。 image.png

static final class ObserveOnObserver2<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable {     @Override     public void onNext(T t) {          ......         if (sourceMode != QueueDisposable.ASYNC) {             queue.offer(t); //将数据放到队列         }        if (getAndIncrement() == 0)              worker.schedule(this);         }     }       @Override     public void run() {         if (outputFused) {             drainFused()         } else {             drainNormal();         }     }     void drainNormal() {         .....          T v = q.poll();          downstream.onNext(v);     } } 复制代码

跟之前的observeOnObserver一样代码,此时observeOnObserver2的onNext跟MapObserver的onNext是在同一个线程,都是在io线程执行,observeOnObserver2的下游就是用户observer了,在这里经过主线程代码切换后,用户observer的onNext的方法就在主线程成执行了。

四、分析线程

image.png

image.png 有人说多次subscribeOn,只有第一次生效,其实每次都生效。
每次subscribeOn设置线程都会对它source Observable的subscribe方法(或者subscribeActual)产生影响。
每次observeOn设置线程都会对它下游的Observer的onNext、onComplete、onError产生影响。

被观察者subscribe方法观察者onSubscribe方法onNext/onComplete/onError方法
伪Observablecomputation线程CreateEmitter-computation线程
ObservableCreatecomputation线程CreateEmitter-computation线程
ObservableSubscribeOnio线程SubscribeOnObservercomputation线程computation线程
ObservableSubscribeOn2用户线程SubscribeOnObserver2io线程computation线程
ObservableObserveOn用户线程ObserveOnObserver用户线程computation线程
ObservableMap用户线程MapObserver用户线程io线程
ObservableObserveOn2用户线程ObserveOnObserver2用户线程io线程
--用户observer用户线程主线程

ps:用户线程就是指observable.subscribe(observer)代码调用的线程。 CreateEmitter不是观者者,没有继承Observer,它只是含有onNext、onComplete、onError方法而已。


作者:信仰年轻
链接:https://juejin.cn/post/7015609207470686221


文章分类
后端
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐