阅读 146

探索Android开源框架 - 5. EventBus使用及源码解析

相关概念

定义

  • Android事件发布/订阅框架,简化应用程序内组件间,组件与后台线程间的通信;

优点

  • 代码简洁,使用简单,将事件发布和订阅充分解耦;

五种ThreadMode

POSTING(默认)

  • 接收事件方法应执行在发射事件方法所在的线程

MAIN

  • 接收事件方法应执行在主线程

MAIN_ORDERED

  • 接收事件方法会被扔进 MessageQueue 中等待执行

BACKGROUND

  • 发射事件方法在主线程中执行,则接收事件方法应执行在 EventBus 维护的单一子线程 执行

  • 发射事件方法在子线程中执行,则接收事件方法应执行在发射事件方法所在的线程

ASYNC

  • 接收方法应执行在不同于发射事件方法所在的另一个线程。常用于耗时操作

使用

添加依赖

  • 导入依赖包

implementation 'org.greenrobot:eventbus:3.2.0'复制代码

Subscriber Index[可选]

  • 作者在EventBus 3中引入了EventBusAnnotationProcessor(注解分析生成索引)技术,大大提高了EventBus的运行效率;

1. java
android {
    defaultConfig {
        javaCompileOptions {
            annotationProcessorOptions {
                arguments = [ eventBusIndex : 'com.example.myapp.MyEventBusIndex' ]
            }
        }
    }
}

dependencies {
    def eventbus_version = '3.2.0'
    implementation "org.greenrobot:eventbus:$eventbus_version"
    annotationProcessor "org.greenrobot:eventbus-annotation-processor:$eventbus_version"
}复制代码
2. kotlin
apply plugin: 'kotlin-kapt' // ensure kapt plugin is applied

dependencies {
    def eventbus_version = '3.2.0'
    implementation "org.greenrobot:eventbus:$eventbus_version"
    kapt "org.greenrobot:eventbus-annotation-processor:$eventbus_version"
}

kapt {
    arguments {
        arg('eventBusIndex', 'com.example.myapp.MyEventBusIndex')
    }
}复制代码
  • 此时需要我们先编译一次,生成索引类。编译成功之后,就会发现在\ProjectName\app\build\generated\source\apt\PakageName\下看到通过注解分析生成的索引类,这样我们便可以在初始化EventBus时应用我们生成的索引了

使用
  • 要应用我们生成好的索引时

val mEventBus = EventBus.builder().addIndex(MyEventBusIndex()).build()复制代码
  • 如果想把自定义的设置应用到EventBus默认的单例中,则可以用installDefaultEventBus()方法

EventBus.builder().addIndex(MyEventBusIndex()).installDefaultEventBus()复制代码

基本使用

  • 基本使用步骤分为注册,自定义Event,订阅,发布,注销几个步骤,其中自定义Event一般单独提取出来复用,发布一般在其他页面或组件中执行,示例代码如下

class EventBusActivity : AppCompatActivity() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_event_bus)
    }

    override fun onStart() {
        super.onStart()
        //1. 注册
        EventBus.getDefault().register(this)
    }

    //2. 自定义Event
    class MessageEvent(val what: Int)

    //3. 订阅:通过设置不同的ThreadMode来指定订阅者的工作线程
    @Subscribe(threadMode = ThreadMode.POSTING)
//    @Subscribe(threadMode = ThreadMode.MAIN)
//    @Subscribe(threadMode = ThreadMode.MAIN_ORDERED)
//    @Subscribe(threadMode = ThreadMode.BACKGROUND)
//    @Subscribe(threadMode = ThreadMode.ASYNC)
    fun onMessageEvent(event: MessageEvent?) {
        LjyLogUtil.d("${Thread.currentThread().name}_event.what=${event?.what}")
    }

    fun onBtnCLick(view: android.view.View) {
        when (view.id) {
            R.id.btn_postInMain -> {
                //4.1 在主线程发布
                LjyLogUtil.d("${Thread.currentThread().name}_post")
                EventBus.getDefault().post(MessageEvent(1001))
            }
            R.id.btn_postInThread -> {
                //4.2 在子线程发布
                Thread {
                    LjyLogUtil.d("${Thread.currentThread().name}_post")
                    EventBus.getDefault().post(MessageEvent(1002))
                }.start()
            }
        }
    }

    override fun onStop() {
        super.onStop()
        //5. 注销
        EventBus.getDefault().unregister(this)
    }
}复制代码

粘性事件

  1. 订阅者注解开启sticky

@Subscribe(threadMode = ThreadMode.MAIN, sticky = true)
fun onMessageEventSticky(event: MessageEvent?) {
    LjyLogUtil.d("${Thread.currentThread().name}_event.what=${event?.what}")
}复制代码
  1. 发布时调用postSticky方法

EventBus.getDefault().postSticky(MessageEvent(1003))复制代码

源码解析

EventBus.getDefault

  • 通过上面的使用我们发现都是调用EventBus.getDefault获取的EventBus实例,那么以此为入口,看一下其实现代码

public static EventBus getDefault() {
    EventBus instance = defaultInstance;
    //一个经典的懒汉式 double check 获取单例
    if (instance == null) {
        synchronized (EventBus.class) {
            instance = EventBus.defaultInstance;
            if (instance == null) {
                instance = EventBus.defaultInstance = new EventBus();
            }
        }
    }
    return instance;
}复制代码

EventBus 构造方法

  • 通过上面的getDefault中调用的无参构造方法我们点进来看一下

public EventBus() {
    this(DEFAULT_BUILDER);
}复制代码
  • DEFAULT_BUILDER是默认的EventBusBuilder

private static final EventBusBuilder DEFAULT_BUILDER = new EventBusBuilder();复制代码
  • this调用了EventBus另一个构造函数, 主要做一些初始化工作

EventBus(EventBusBuilder builder) {
    logger = builder.getLogger();
    subscriptionsByEventType = new HashMap<>();
    typesBySubscriber = new HashMap<>();
    stickyEvents = new ConcurrentHashMap<>();
    mainThreadSupport = builder.getMainThreadSupport();
    //下面三个Poster很重要
    mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
    backgroundPoster = new BackgroundPoster(this);
    asyncPoster = new AsyncPoster(this);

    indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
    subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
            builder.strictMethodVerification, builder.ignoreGeneratedIndex);
    logSubscriberExceptions = builder.logSubscriberExceptions;
    logNoSubscriberMessages = builder.logNoSubscriberMessages;
    sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
    sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
    throwSubscriberException = builder.throwSubscriberException;
    eventInheritance = builder.eventInheritance;
    executorService = builder.executorService;
}复制代码

register

  • 下面我们看一下订阅者的注册方法register

public void register(Object subscriber) {
    Class<?> subscriberClass = subscriber.getClass();
    //寻找 subscriber(我们实例中register(this)的this,当前Activity,Fragment等)中订阅的方法
    List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
    synchronized (this) {
        for (SubscriberMethod subscriberMethod : subscriberMethods) {
            //注册订阅
            subscribe(subscriber, subscriberMethod);
        }
    }
}复制代码
  • 可以看到register中主要是调用findSubscriberMethods查找订阅方法,再调用subscribe进行订阅

findSubscriberMethods

  • 查找传进来的订阅者的所有订阅方法,保存到SubscriberMethod集合中;

  • SubscriberMethod类主要是保存订阅方法的Method对象、线程模式、事件类型、优先级、是否是粘性事件等属性;

//存着注册类与其所有需要回调的 Event 方法列表的键值对
private static final Map<Class<?>, List<SubscriberMethod>> METHOD_CACHE = new ConcurrentHashMap<>();

List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
    //先从缓存中取
    List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
    if (subscriberMethods != null) {
        return subscriberMethods;
    }
    //判断是否忽略注解器生成的MyEventBusIndex,也就是我们开头添加依赖时的Subscriber Index部分
    if (ignoreGeneratedIndex) {
        //通过反射获取
        subscriberMethods = findUsingReflection(subscriberClass);
    } else {
        subscriberMethods = findUsingInfo(subscriberClass);
    }
    //如果订阅者中不存在被@Subscribe注解的public的方法,则抛出异常
    if (subscriberMethods.isEmpty()) {
        throw new EventBusException("Subscriber " + subscriberClass
                + " and its super classes have no public methods with the @Subscribe annotation");
    } else {
        //如果订阅者中存在订阅方法
        //放入缓存中
        METHOD_CACHE.put(subscriberClass, subscriberMethods);
        //返回集合
        return subscriberMethods;
    }
}

private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
    //初始化 FindState 对象
    FindState findState = prepareFindState();
    findState.initForSubscriber(subscriberClass);
    // while 循环中,不停地去反射获取当前类和其父类的订阅方法并添入列表中,
    //注意,在 Java 中,如果当前类实现了一个接口,即使该接口的方法被 @Subscribe 所修饰,
    //当前类中的方法也是不包含该注解属性的,所以如果在接口中对某个方法使用了 @Subscribe 修饰然后让类去实现这个接口是没有任何作用的
    while (findState.clazz != null) {
        findUsingReflectionInSingleClass(findState);
        findState.moveToSuperclass();
    }
    最终返回这个列表并重置 FindState 对象利于下一次重复使用
    return getMethodsAndRelease(findState);
}复制代码
  • 从上面代码我们可以知道,调用register的类中,需要有被@Subscribe注解的方法,且必须为public方法;

  • 上面代码中ignoreGeneratedIndex默认为false的,项目中经常通过EventBus单例模式来获取默认的EventBus对象,也就是ignoreGeneratedIndex为false的情况,这种情况调用了findUsingInfo方法:

private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
    FindState findState = prepareFindState();
    findState.initForSubscriber(subscriberClass);
    while (findState.clazz != null) {
        //获取订阅者信息,没有配置MyEventBusIndex返回null
        findState.subscriberInfo = getSubscriberInfo(findState);

        if (findState.subscriberInfo != null) {
            //如果通过EventBusBuilder配置了MyEventBusIndex,便会获取到subscriberInfo,
            //调用subscriberInfo的getSubscriberMethods方法可以得到订阅方法相关的信息,
            //这个时候就不在需要通过注解进行获取订阅方法
            SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
            for (SubscriberMethod subscriberMethod : array) {
                //将订阅方法保存到findState
                if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
                    findState.subscriberMethods.add(subscriberMethod);
                }
            }
        } else {
            //通过反射查找订阅方法
            findUsingReflectionInSingleClass(findState);
        }
        findState.moveToSuperclass();
    }
    //对findState做回收处理并反回订阅方法的List集合
    return getMethodsAndRelease(findState);
}复制代码
  • findUsingReflectionInSingleClass通过Java的反射和对注解的解析查找订阅方法,并保存到FindState中,代码如下

private void findUsingReflectionInSingleClass(FindState findState) {
    Method[] methods;
    try {
        // This is faster than getMethods, especially when subscribers are fat classes like Activities
        methods = findState.clazz.getDeclaredMethods();
    } catch (Throwable th) {
        // Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
        try {
            methods = findState.clazz.getMethods();
        } catch (LinkageError error) { // super class of NoClassDefFoundError to be a bit more broad...
            String msg = "Could not inspect methods of " + findState.clazz.getName();
            if (ignoreGeneratedIndex) {
                msg += ". Please consider using EventBus annotation processor to avoid reflection.";
            } else {
                msg += ". Please make this class visible to EventBus annotation processor to avoid reflection.";
            }
            throw new EventBusException(msg, error);
        }
        findState.skipSuperClasses = true;
    }
    for (Method method : methods) {
        int modifiers = method.getModifiers();
        if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
            Class<?>[] parameterTypes = method.getParameterTypes();
            if (parameterTypes.length == 1) {
                Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
                if (subscribeAnnotation != null) {
                    Class<?> eventType = parameterTypes[0];
                    if (findState.checkAdd(method, eventType)) {
                        ThreadMode threadMode = subscribeAnnotation.threadMode();
                        findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
                                subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
                    }
                }
            } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                throw new EventBusException("@Subscribe method " + methodName +
                        "must have exactly 1 parameter but has " + parameterTypes.length);
            }
        } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
            String methodName = method.getDeclaringClass().getName() + "." + method.getName();
            throw new EventBusException(methodName +
                    " is a illegal @Subscribe method: must be public, non-static, and non-abstract");
        }
    }
}复制代码

subscribe

  • 继续回到register,查找完订阅方法后便通过遍历调用subscribe,对所有的订阅方法进行注册

//
private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
private final Map<Object, List<Class<?>>> typesBySubscriber;

// Must be called in synchronized block
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
    Class<?> eventType = subscriberMethod.eventType;
    //根据订阅者和订阅方法构造一个订阅事件
    Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
    //根据EventType找到订阅事件,从而去分发事件,处理事件
    CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
    if (subscriptions == null) {
        subscriptions = new CopyOnWriteArrayList<>();
        subscriptionsByEventType.put(eventType, subscriptions);
    } else {
        //订阅者已经注册则抛出EventBusException
        if (subscriptions.contains(newSubscription)) {
            throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                    + eventType);
        }
    }

    //遍历订阅事件,找到比subscriptions中订阅事件优先级小的位置,然后插进去
    int size = subscriptions.size();
    for (int i = 0; i <= size; i++) {
        if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
            subscriptions.add(i, newSubscription);
            break;
        }
    }
    //通过订阅者获取该订阅者所订阅事件的集合
    List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
    if (subscribedEvents == null) {
        subscribedEvents = new ArrayList<>();
        typesBySubscriber.put(subscriber, subscribedEvents);
    }
    //将当前的订阅事件添加到subscribedEvents中
    subscribedEvents.add(eventType);
    //如果是粘性事件的话,就立即投递、执行
    if (subscriberMethod.sticky) {
        //默认为true
        if (eventInheritance) {
            // Existing sticky events of all subclasses of eventType have to be considered.
            // Note: Iterating over all events may be inefficient with lots of sticky events,
            // thus data structure should be changed to allow a more efficient lookup
            // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
            Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
            for (Map.Entry<Class<?>, Object> entry : entries) {
                Class<?> candidateEventType = entry.getKey();
                if (eventType.isAssignableFrom(candidateEventType)) {
                    Object stickyEvent = entry.getValue();
                    checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                }
            }
        } else {
            Object stickyEvent = stickyEvents.get(eventType);
            checkPostStickyEventToSubscription(newSubscription, stickyEvent);
        }
    }
}复制代码

post

  • 下面来看一下post方法,代码如下

public void post(Object event) {
    //PostingThreadState保存着事件队列和线程状态信息
    PostingThreadState postingState = currentPostingThreadState.get();
    //获取事件队列,并将当前事件插入到事件队列中
    List<Object> eventQueue = postingState.eventQueue;
    eventQueue.add(event);
    //确保不会被多次执行
    if (!postingState.isPosting) {
        postingState.isMainThread = isMainThread();
        postingState.isPosting = true;
        if (postingState.canceled) {
            throw new EventBusException("Internal error. Abort state was not reset");
        }
        try {
            //遍历处理队列中的事件
            while (!eventQueue.isEmpty()) {
                //post单个事件
                postSingleEvent(eventQueue.remove(0), postingState);
            }
        } finally {
            //重置状态
            postingState.isPosting = false;
            postingState.isMainThread = false;
        }
    }
}复制代码
  • post方法中调用了postSingleEvent处理单个事件,代码如下

private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
    Class<?> eventClass = event.getClass();
    boolean subscriptionFound = false;
    //eventInheritance表示是否向上查找事件的父类,默认为true
    if (eventInheritance) {
        //取出 Event 及其父类和接口的 class 列表
        List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
        int countTypes = eventTypes.size();
        for (int h = 0; h < countTypes; h++) {
            Class<?> clazz = eventTypes.get(h);
            subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
        }
    } else {
        subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
    }
    //找不到该事件时的异常处理
    if (!subscriptionFound) {
        if (logNoSubscriberMessages) {
            logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
        }
        if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                eventClass != SubscriberExceptionEvent.class) {
            post(new NoSubscriberEvent(this, event));
        }
    }
}复制代码
  • postSingleEvent中又调用了postSingleEventForEventType,代码如下

private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
    CopyOnWriteArrayList<Subscription> subscriptions;
    synchronized (this) {
        //取出该事件对应的Subscription集合
        subscriptions = subscriptionsByEventType.get(eventClass);
    }
    if (subscriptions != null && !subscriptions.isEmpty()) {
        for (Subscription subscription : subscriptions) {
            postingState.event = event;
            postingState.subscription = subscription;
            boolean aborted;
            try {
                //对事件进行处理
                postToSubscription(subscription, event, postingState.isMainThread);
                aborted = postingState.canceled;
            } finally {
                postingState.event = null;
                postingState.subscription = null;
                postingState.canceled = false;
            }
            if (aborted) {
                break;
            }
        }
        return true;
    }
    return false;
}复制代码
  • postSingleEventForEventType中又调用了postToSubscription,代码如下,通过下面代码我们也可以知道之前介绍五种ThreadMode的不同之处

private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
    //取出订阅方法的线程模式,之后根据线程模式来分别处理
    switch (subscription.subscriberMethod.threadMode) {
        case POSTING:
            //直接执行 invokeSubscriber() 方法,内部直接采用反射调用
            invokeSubscriber(subscription, event);
            break;
        case MAIN:
            //判断当前是否在 UI 线程
            if (isMainThread) {
                //直接采用反射调用
                invokeSubscriber(subscription, event);
            } else {
                //把当前的方法加入到队列之中,然后通过 handler 去发送一个消息,在 handler 的 handleMessage 中去执行方法
                mainThreadPoster.enqueue(subscription, event);
            }
            break;
        case MAIN_ORDERED:
            // 与MAIN类似,不过是确保是顺序执行的
            if (mainThreadPoster != null) {
                mainThreadPoster.enqueue(subscription, event);
            } else {
                invokeSubscriber(subscription, event);
            }
            break;
        case BACKGROUND:
            //判断当前是否在 UI 线程
            if (isMainThread) {
                backgroundPoster.enqueue(subscription, event);
            } else {
                //直接采用反射调用
                invokeSubscriber(subscription, event);
            }
            break;
        case ASYNC:
            asyncPoster.enqueue(subscription, event);
            break;
        default:
            throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
    }
}复制代码
  • postToSubscription处理完成后又会调用invokeSubscriber去回调订阅方法

void invokeSubscriber(PendingPost pendingPost) {
    Object event = pendingPost.event;
    Subscription subscription = pendingPost.subscription;
    PendingPost.releasePendingPost(pendingPost);
    if (subscription.active) {
        invokeSubscriber(subscription, event);
    }
}

void invokeSubscriber(Subscription subscription, Object event) {
    try {
        subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
    } catch (InvocationTargetException e) {
        handleSubscriberException(subscription, event, e.getCause());
    } catch (IllegalAccessException e) {
        throw new IllegalStateException("Unexpected exception", e);
    }
}复制代码

postSticky

  • 看完了post的流程,再来看一下postSticky,代码如下,先将该事件放入 stickyEvents 中,再正常调用post;

public void postSticky(Object event) {
    //为避免多线程操作 postSticky(Object) 和 removeStickyEvent(Class<?>) 引发的冲突,所以对 stickyEvents 对象添加了 synchronized 关键字
    synchronized (stickyEvents) {
        stickyEvents.put(event.getClass(), event);
    }
    // Should be posted after it is putted, in case the subscriber wants to remove immediately
    post(event);
}复制代码

unregister

  • 接下来看一下注销的方法unregister,代码如下

public synchronized void unregister(Object subscriber) {
    List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
    if (subscribedTypes != null) {
        for (Class<?> eventType : subscribedTypes) {
            //对 subscriptionsByEventType 移除了该 subscriber 的所有订阅信息
            unsubscribeByEventType(subscriber, eventType);
        }
        //移除了注册对象和其对应的所有 Event 事件链表
        typesBySubscriber.remove(subscriber);
    } else {
        logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
    }
}复制代码

番外

为什么使用事件总线机制来替代广播?

  1. 广播:耗时、容易被捕获(不安全);

  2. 事件总线:更节省资源、更高效,能将信息传递给原生以外的各种对象

EventBus2.x的版本和3.x是有很大区别的

  1. 2.x使用的是运行时注解,采用了反射的方式对整个注册的类的所有方法进行扫描来完成注册,因而会对性能有一定影响;

  2. 3.x使用的是编译时注解,Java文件会编译成.class文件,再对class文件进行打包等一系列处理。在编译成.class文件时,EventBus会使用EventBusAnnotationProcessor注解处理器读取@Subscribe()注解并解析、处理其中的信息,然后生成Java类来保存所有订阅者的订阅信息。这样就创建出了对文件或类的索引关系,并将其编入到apk中;

  3. 从EventBus3.0开始使用了对象池缓存减少了创建对象的开销;

跨进程问题

  • 目前EventBus只支持跨线程,而不支持跨进程。如果一个app的service起到了另一个进程中,那么注册监听的模块则会收不到另一个进程的EventBus发出的事件。这里可以考虑利用IPC做映射表,并在两个进程中各维护一个EventBus,不过这样就要自己去维护register和unregister的关系,比较繁琐,而且这种情况下通常用广播会更加方便;

RxBus

  • RxBus不是一个库,而是一个文件,实现只有短短30行代码。RxBus本身不需要过多分析,它的强大完全来自于它基于的RxJava技术;

  • 基于RxJava2的RxBus实现代码如下

public final class RxBus2 {

    private final Subject<Object> bus;

    private RxBus2() {
        // toSerialized method made bus thread safe
        bus = PublishSubject.create().toSerialized();
    }

    public static RxBus2 getInstance() {
        return Holder.BUS;
    }

    private static class Holder {
        private static final RxBus2 BUS = new RxBus2();
    }

    public void post(Object obj) {
        bus.onNext(obj);
    }

    public <T> Observable<T> toObservable(Class<T> tClass) {
        return bus.ofType(tClass);
    }

    public Observable<Object> toObservable() {
        return bus;
    }

    public boolean hasObservers() {
        return bus.hasObservers();
    }
}复制代码

RxBus 与 EventBus 比较

  • 其实也就是rxJava的优点:

  1. RxJava的Observable有onError、onComplete等状态回调;

  2. RxJava使用组合而非嵌套的方式,避免了回调地狱;

  3. RxJava的线程调度设计的更加优秀,更简单易用;

  4. RxJava可使用多种操作符来进行链式调用来实现复杂的逻辑;

  5. RxJava的信息效率高于EventBus2.x,低于EventBus3.x;

  • 那么技术选型时如何取舍呢?如果项目中使用了RxJava,则使用RxBus,否则使用EventBus3.x;

LiveDataBus

  • LiveDataBus是基于LiveData实现的类似EventBus的消息通信框架,它是基于LiveData实现的,完全可以代替EventBus,RxBus;

为什么会有LiveDataBus呢?

  • Handler : 容易导致内存泄漏,空指针,高耦合,不利于维护

  • EventBus :原理实现复杂,无法混淆,需要手动绑定生命周期

  • RxBus:依赖于RxJava,包太大,影响apk大小,app启动时间

初代实现如下

public final class LiveDataBus {

    private final Map<String, MutableLiveData<Object>> bus;

    private LiveDataBus() {
        bus = new HashMap<>();
    }

    private static class SingletonHolder {
        private static final LiveDataBus DATA_BUS = new LiveDataBus();
    }

    public static LiveDataBus get() {
        return SingletonHolder.DATA_BUS;
    }

    public <T> MutableLiveData<T> getChannel(String target, Class<T> type) {
        if (!bus.containsKey(target)) {
            bus.put(target, new MutableLiveData<>());
        }
        return (MutableLiveData<T>) bus.get(target);
    }

    public MutableLiveData<Object> getChannel(String target) {
        return getChannel(target, Object.class);
    }
}
//使用
注册订阅:
LiveDataBus.get().getChannel("key_test", Boolean.class)
        .observe(this, new Observer<Boolean>() {
            @Override
            public void onChanged(@Nullable Boolean aBoolean) {
            }
        });
发送消息:
LiveDataBus.get().getChannel("key_test").setValue(true);复制代码

LiveDataBus最终实现

public final class LiveDataBus {

    private final Map<String, BusMutableLiveData<Object>> bus;

    private LiveDataBus() {
        bus = new HashMap<>();
    }

    private static class SingletonHolder {
        private static final LiveDataBus DEFAULT_BUS = new LiveDataBus();
    }

    public static LiveDataBus get() {
        return SingletonHolder.DEFAULT_BUS;
    }

    public <T> MutableLiveData<T> with(String key, Class<T> type) {
        if (!bus.containsKey(key)) {
            bus.put(key, new BusMutableLiveData<>());
        }
        return (MutableLiveData<T>) bus.get(key);
    }

    public MutableLiveData<Object> with(String key) {
        return with(key, Object.class);
    }

    private static class ObserverWrapper<T> implements Observer<T> {

        private Observer<T> observer;

        public ObserverWrapper(Observer<T> observer) {
            this.observer = observer;
        }

        @Override
        public void onChanged(@Nullable T t) {
            if (observer != null) {
                if (isCallOnObserve()) {
                    return;
                }
                observer.onChanged(t);
            }
        }

        private boolean isCallOnObserve() {
            StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
            if (stackTrace != null && stackTrace.length > 0) {
                for (StackTraceElement element : stackTrace) {
                    if ("android.arch.lifecycle.LiveData".equals(element.getClassName()) &&
                            "observeForever".equals(element.getMethodName())) {
                        return true;
                    }
                }
            }
            return false;
        }
    }

    private static class BusMutableLiveData<T> extends MutableLiveData<T> {

        private Map<Observer, Observer> observerMap = new HashMap<>();

        @Override
        public void observe(@NonNull LifecycleOwner owner, @NonNull Observer<T> observer) {
            super.observe(owner, observer);
            try {
                hook(observer);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        @Override
        public void observeForever(@NonNull Observer<T> observer) {
            if (!observerMap.containsKey(observer)) {
                observerMap.put(observer, new ObserverWrapper(observer));
            }
            super.observeForever(observerMap.get(observer));
        }

        @Override
        public void removeObserver(@NonNull Observer<T> observer) {
            Observer realObserver = null;
            if (observerMap.containsKey(observer)) {
                realObserver = observerMap.remove(observer);
            } else {
                realObserver = observer;
            }
            super.removeObserver(realObserver);
        }

        private void hook(@NonNull Observer<T> observer) throws Exception {
            //get wrapper's version
            Class<LiveData> classLiveData = LiveData.class;
            Field fieldObservers = classLiveData.getDeclaredField("mObservers");
            fieldObservers.setAccessible(true);
            Object objectObservers = fieldObservers.get(this);
            Class<?> classObservers = objectObservers.getClass();
            Method methodGet = classObservers.getDeclaredMethod("get", Object.class);
            methodGet.setAccessible(true);
            Object objectWrapperEntry = methodGet.invoke(objectObservers, observer);
            Object objectWrapper = null;
            if (objectWrapperEntry instanceof Map.Entry) {
                objectWrapper = ((Map.Entry) objectWrapperEntry).getValue();
            }
            if (objectWrapper == null) {
                throw new NullPointerException("Wrapper can not be bull!");
            }
            Class<?> classObserverWrapper = objectWrapper.getClass().getSuperclass();
            Field fieldLastVersion = classObserverWrapper.getDeclaredField("mLastVersion");
            fieldLastVersion.setAccessible(true);
            //get livedata's version
            Field fieldVersion = classLiveData.getDeclaredField("mVersion");
            fieldVersion.setAccessible(true);
            Object objectVersion = fieldVersion.get(this);
            //set wrapper's version
            fieldLastVersion.set(objectWrapper, objectVersion);
        }
    }
}

//注册订阅
LiveDataBus.get()
    .with("key_test", String.class)
    .observe(this, new Observer<String>() {
        @Override
        public void onChanged(@Nullable String s) {
        }
    });
//发送消息
LiveDataBus.get().with("key_test").setValue(s);复制代码
  • 想了解更多LiveDataBus可以参考下面的开源项目:

https://github.com/JeremyLiao/LiveEventBus复制代码

事件总线的考量

  • 其实目前常用的各种事件总线xxBus原理都差不多,那么在项目中如何使用这些事件总线呢:

  1. EventBus,RxBus: 将xxEvent消息容器和事件总线框架的依赖放到base module,其他模块组件依赖于base module; 但是这样每个模块改动都需要增删改baseModule中的消息容器, 组件化要求功能模块独立, 各组件应该尽量避免影响base module;

  2. LiveDataBus: 无需建立消息模型,但无法想前两者一样拥有类名索引,无法引导正确的编写代码,也无法传递自定义实体到其他模块;

  3. 使用EventBus,RxBus,为了更大程度的解耦,可以独立出一个事件总线module,添加事件的实体都在这个module中,base module依赖 这个事件总线module对事件通信的解耦, 抽离事件到事件总线module中减少对base module的影响;


作者:今阳
链接:https://juejin.cn/post/7020671328193609765


文章分类
后端
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐