阅读 93

Rxjava - 线程切换原理分析

小例子

Observable.create(object: ObservableOnSubscribe<String> {
    override fun subscribe(emitter: ObservableEmitter<String>) {
        emitter.onNext("arrom")
    }

}).subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(object :Consumer<String>{
        override fun accept(t: String) {
            TODO("Not yet implemented")
        }
    })复制代码

Observable.create

public static <@NonNull T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
    Objects.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
}复制代码

传入一个ObservableOnSubscribe,生产ObservableCreate对象

ObservableOnSubscribe是自己写的一个内部类

ObservableOnSubscribe<String> {
    override fun subscribe(emitter: ObservableEmitter<String>) {
        emitter.onNext("arrom")
    }
}复制代码

ObservableCreate构造方法如下

public ObservableCreate(ObservableOnSubscribe<T> source) {
    this.source = source;
}复制代码

执行subscribeOn(Schedulers.io())方法

@NonNull
public final Observable<T> subscribeOn(@NonNull Scheduler scheduler) {
    Objects.requireNonNull(scheduler, "scheduler is null");
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<>(this, scheduler));
}复制代码

生成一个ObservableSubscribeOn对象

执行observeOn(AndroidSchedulers.mainThread())方法

public final Observable<T> observeOn(@NonNull Scheduler scheduler, boolean delayError, int bufferSize) {
    Objects.requireNonNull(scheduler, "scheduler is null");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<>(this, scheduler, delayError, bufferSize));
}复制代码

生成一个ObservableObserveOn对象

执行subscribe()方法

public final Disposable subscribe(@NonNull Consumer<? super T> onNext) {
    return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION);
}复制代码

subscribe方法中我们传了一个onNext(consumer对象)操作,进入subscribe源码里

public final Disposable subscribe(@NonNull Consumer<? super T> onNext, @NonNull Consumer<? super Throwable> onError,
        @NonNull Action onComplete) {
    Objects.requireNonNull(onNext, "onNext is null");
    Objects.requireNonNull(onError, "onError is null");
    Objects.requireNonNull(onComplete, "onComplete is null");

    LambdaObserver<T> ls = new LambdaObserver<>(onNext, onError, onComplete, Functions.emptyConsumer());

    subscribe(ls);

    return ls;
}复制代码

新建了一个LambdaObserver对象,并传入了 onNext, onError, onComplete, onSubscribe

调用subscribe方法
public final void subscribe(@NonNull Observer<? super T> observer) {
    Objects.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);

        Objects.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        // can't call onError because no way to know if a Disposable has been set or not
        // can't call onSubscribe because the call might have set a Subscription already
        RxJavaPlugins.onError(e);

        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
        npe.initCause(e);
        throw npe;
    }
}复制代码

而在这里调用subscribe方法是经过observeOn方法之后的ObservableObserveOn对象。 最后调用ObservableObserveOn里面的subscribeActual方法

@Override
protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
        Scheduler.Worker w = scheduler.createWorker();
        source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));
    }
}复制代码

线程切换

subscribeOn(Schedulers.io())会生成一个ObservableSubscribeOn对象

Schedulers.io() 会生成一个线程调度对象IoScheduler

@Override
public void subscribeActual(final Observer<? super T> observer) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);

    observer.onSubscribe(parent);

    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

final class SubscribeTask implements Runnable {
    private final SubscribeOnObserver<T> parent;

    SubscribeTask(SubscribeOnObserver<T> parent) {
        this.parent = parent;
    }

    @Override
    public void run() {
        source.subscribe(parent);
    }
}

public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    final Worker w = createWorker();

    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    DisposeTask task = new DisposeTask(decoratedRun, w);

    w.schedule(task, delay, unit);

    return task;
}复制代码

createWorker是子类实现的,所以我们直接戳进IoSchedule的createWorke方法

public Worker createWorker() {
    return new EventLoopWorker(pool.get());
}复制代码

进入EventLoopWorker类中

public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
    if (tasks.isDisposed()) {
        // don't schedule, we are unsubscribed
        return EmptyDisposable.INSTANCE;
    }

    return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}复制代码

继续看scheduleActual方法

public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
    Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

    if (parent != null) {
        if (!parent.add(sr)) {
            return sr;
        }
    }

    Future<?> f;
    try {
        if (delayTime <= 0) {
            f = executor.submit((Callable<Object>)sr);
        } else {
            f = executor.schedule((Callable<Object>)sr, delayTime, unit);
        }
        sr.setFuture(f);
    } catch (RejectedExecutionException ex) {
        if (parent != null) {
            parent.remove(sr);
        }
        RxJavaPlugins.onError(ex);
    }

    return sr;
}复制代码

进入ScheduledRunnable类中

public void run() {
    lazySet(THREAD_INDEX, Thread.currentThread());
    try {
        try {
            actual.run();
        } catch (Throwable e) {
            // Exceptions.throwIfFatal(e); nowhere to go
            RxJavaPlugins.onError(e);
            throw e;
        }
    } finally {
        Object o = get(PARENT_INDEX);
        if (o != PARENT_DISPOSED && compareAndSet(PARENT_INDEX, o, DONE) && o != null) {
            ((DisposableContainer)o).delete(this);
        }

        for (;;) {
            o = get(FUTURE_INDEX);
            if (o == SYNC_DISPOSED || o == ASYNC_DISPOSED || compareAndSet(FUTURE_INDEX, o, DONE)) {
                break;
            }
        }
        lazySet(THREAD_INDEX, null);
    }
}复制代码

ObservableCreate.CreateEmitter.onNext方法是在ObservableSubscribeOn.SubscribeTask的run方法里被调用的,刚才也说了,经过各种高深复杂的方式把ObservableSubscribeOn.SubscribeTask放到了一个新的Thread(Schedules.io)里面去执行,那么从ObservableCreate.CreateEmitter.onNext方法开始,后续的执行逻辑就也都在一个新的Thread(我们指定的Schedules.io)里面去了

大致的流程图

image.png


作者:深圳_Arrom
链接:https://juejin.cn/post/7025228821380988935

文章分类
后端
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐