Rxjava - 线程切换原理分析
小例子
Observable.create(object: ObservableOnSubscribe<String> { override fun subscribe(emitter: ObservableEmitter<String>) { emitter.onNext("arrom") } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(object :Consumer<String>{ override fun accept(t: String) { TODO("Not yet implemented") } })复制代码
Observable.create
public static <@NonNull T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) { Objects.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate<>(source)); }复制代码
传入一个ObservableOnSubscribe,生产ObservableCreate对象
ObservableOnSubscribe是自己写的一个内部类
ObservableOnSubscribe<String> { override fun subscribe(emitter: ObservableEmitter<String>) { emitter.onNext("arrom") } }复制代码
ObservableCreate构造方法如下
public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; }复制代码
执行subscribeOn(Schedulers.io())方法
@NonNull public final Observable<T> subscribeOn(@NonNull Scheduler scheduler) { Objects.requireNonNull(scheduler, "scheduler is null"); return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<>(this, scheduler)); }复制代码
生成一个ObservableSubscribeOn
对象
执行observeOn(AndroidSchedulers.mainThread())方法
public final Observable<T> observeOn(@NonNull Scheduler scheduler, boolean delayError, int bufferSize) { Objects.requireNonNull(scheduler, "scheduler is null"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return RxJavaPlugins.onAssembly(new ObservableObserveOn<>(this, scheduler, delayError, bufferSize)); }复制代码
生成一个ObservableObserveOn
对象
执行subscribe()方法
public final Disposable subscribe(@NonNull Consumer<? super T> onNext) { return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION); }复制代码
subscribe方法中我们传了一个onNext(consumer对象)操作,进入subscribe源码里
public final Disposable subscribe(@NonNull Consumer<? super T> onNext, @NonNull Consumer<? super Throwable> onError, @NonNull Action onComplete) { Objects.requireNonNull(onNext, "onNext is null"); Objects.requireNonNull(onError, "onError is null"); Objects.requireNonNull(onComplete, "onComplete is null"); LambdaObserver<T> ls = new LambdaObserver<>(onNext, onError, onComplete, Functions.emptyConsumer()); subscribe(ls); return ls; }复制代码
新建了一个LambdaObserver对象,并传入了 onNext, onError, onComplete, onSubscribe
调用subscribe方法
public final void subscribe(@NonNull Observer<? super T> observer) { Objects.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); Objects.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins"); subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } }复制代码
而在这里调用subscribe方法是经过observeOn方法之后的ObservableObserveOn对象。 最后调用ObservableObserveOn里面的subscribeActual方法
@Override protected void subscribeActual(Observer<? super T> observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize)); } }复制代码
线程切换
subscribeOn(Schedulers.io())会生成一个ObservableSubscribeOn对象
Schedulers.io() 会生成一个线程调度对象IoScheduler
@Override public void subscribeActual(final Observer<? super T> observer) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer); observer.onSubscribe(parent); parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); } final class SubscribeTask implements Runnable { private final SubscribeOnObserver<T> parent; SubscribeTask(SubscribeOnObserver<T> parent) { this.parent = parent; } @Override public void run() { source.subscribe(parent); } } public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { final Worker w = createWorker(); final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); DisposeTask task = new DisposeTask(decoratedRun, w); w.schedule(task, delay, unit); return task; }复制代码
createWorker是子类实现的,所以我们直接戳进IoSchedule的createWorke方法
public Worker createWorker() { return new EventLoopWorker(pool.get()); }复制代码
进入EventLoopWorker类中
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) { if (tasks.isDisposed()) { // don't schedule, we are unsubscribed return EmptyDisposable.INSTANCE; } return threadWorker.scheduleActual(action, delayTime, unit, tasks); }复制代码
继续看scheduleActual方法
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) { Runnable decoratedRun = RxJavaPlugins.onSchedule(run); ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent); if (parent != null) { if (!parent.add(sr)) { return sr; } } Future<?> f; try { if (delayTime <= 0) { f = executor.submit((Callable<Object>)sr); } else { f = executor.schedule((Callable<Object>)sr, delayTime, unit); } sr.setFuture(f); } catch (RejectedExecutionException ex) { if (parent != null) { parent.remove(sr); } RxJavaPlugins.onError(ex); } return sr; }复制代码
进入ScheduledRunnable类中
public void run() { lazySet(THREAD_INDEX, Thread.currentThread()); try { try { actual.run(); } catch (Throwable e) { // Exceptions.throwIfFatal(e); nowhere to go RxJavaPlugins.onError(e); throw e; } } finally { Object o = get(PARENT_INDEX); if (o != PARENT_DISPOSED && compareAndSet(PARENT_INDEX, o, DONE) && o != null) { ((DisposableContainer)o).delete(this); } for (;;) { o = get(FUTURE_INDEX); if (o == SYNC_DISPOSED || o == ASYNC_DISPOSED || compareAndSet(FUTURE_INDEX, o, DONE)) { break; } } lazySet(THREAD_INDEX, null); } }复制代码
ObservableCreate.CreateEmitter.onNext方法是在ObservableSubscribeOn.SubscribeTask的run方法里被调用的,刚才也说了,经过各种高深复杂的方式把ObservableSubscribeOn.SubscribeTask放到了一个新的Thread(Schedules.io)里面去执行,那么从ObservableCreate.CreateEmitter.onNext方法开始,后续的执行逻辑就也都在一个新的Thread(我们指定的Schedules.io)里面去了
大致的流程图
作者:深圳_Arrom
链接:https://juejin.cn/post/7025228821380988935