EventBus源码流程解析(eventbus实现原理)
1. 使用详解
注册(最好在activity或者fragment中的onStart()方法以及onStop方法中注册)
解绑,在对应的生命周期方法中。
发送消息
接收消息首先,需要新建一个类,作为发送消息的对象
ThreadMode.MAIN 表示这个方法在主线程中执行。事件的处理会在UI线程中执行。事件处理时间不能太长,长了会ANR的。
ThreadMode.BACKGROUND表示该方法在后台执行,不能并发处理.如果事件是在UI线程中发布出来的,那么该事件处理函数就会在新的线程中运行,如果事件本来就是子线程中发布出来的,那么该事件处理函数直接在发布事件的线程中执行。在此事件处理函数中禁止进行UI更新操作。
ThreadMode.ASYNC,表示在后台执行,可以异步并发处理.无论事件在哪个线程发布,该事件处理函数都会在新建的子线程中执行,同样,此事件处理函数中禁止进行UI更新操作。
ThreadMode.POSTING,表示该方法和消息发送方在同一个线程中执行。如果使用事件处理函数指定了线程模型为POSTING,那么该事件在哪个线程发布出来的,事件处理函数就会在这个线程中运行,也就是说发布事件和接收事件在同一个线程。在线程模型为POSTING的事件处理函数中尽量避免执行耗时操作,因为它会阻塞事件的传递,甚至有可能会引起ANR。
public final String message; public MessageEvent(String message) { this.message = message; } 复制代码
其次,在需要发送消息的activity中或者fragment中
EventBus.getDefault().post(new MessageEvent("这是使用EventBus进行通信的")); 复制代码
最后在接受消息的activity或者fragment中注册EventBus,并且设置接受消息的方法。首先需要注册以及取消EventBus时间
@Override public void onStart() { super.onStart(); EventBus.getDefault().register(this); } @Override public void onDestroy() { super.onDestroy(); EventBus.getDefault().unregister(this); } @Subscribe(threadMode = ThreadMode.MAIN) public void getMsg(MessageEvent messageEvent){ textView.setText(messageEvent.message); } 复制代码
2.EventBus3.0粘性事件
在MainActivity中新写一个方法用来处理粘性事件:
@Subscribe(threadMode = ThreadMode.POSTING,sticky = true) public void ononMoonStickyEvent(MessageEvent messageEvent){ tv_message.setText(messageEvent.getMessage()); // 如果在接收后,从stickEvents中移除粘性事件,可以这样: if (messageEvent!= null) { EventBus.getDefault().removeStickyEvent(messageEvent); } } 复制代码
发送粘性事件
在SecondActivity中我们定义一个Button来发送粘性事件:
bt_subscription.setOnClickListener(new View.OnClickListener() { @Override public void onClick(View v) { EventBus.getDefault().postSticky(new MessageEvent("粘性事件")); finish(); } }); 复制代码
2. 源码解析
register
|---EventBus | |---register(Object subscriber) | | |--- subscriberMethodFinder.findSubscriberMethods(subscriberClass) | | |---subscribe(subscriber, subscriberMethod); |---findUsingReflectionInSingleClass | |--- Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);//找到带有注解的方法 | |---SubscriberMethod() // 找到方法后,并将属性封装进这个对象。 复制代码
EventBus的原理是观察者模式,所以,首先需要有的是注册订阅者,其次是发送事件。
注册订阅者用到的就是register方法。
EventBus.getDefault().register(this);
首先分析getDefault()
,通过它获取EventBus的单例,(实现方法是双重锁方法),内部还有个builder,使用了builder设计模式。然后进入到register方法
public void register(Object subscriber) { Class<?> subscriberClass = subscriber.getClass(); //获取到订阅者类(Activity/Fragment) List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass); //通过一个subscribeMethod用去来装。 synchronized (this) { for (SubscriberMethod subscriberMethod : subscriberMethods) { subscribe(subscriber, subscriberMethod); } } } // ???? SubscriberMethodFinder // private static final Map<Class<?>, List<SubscriberMethod>> METHOD_CACHE = new ConcurrentHashMap<>(); List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) { List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass); if (subscriberMethods != null) { return subscriberMethods; } if (ignoreGeneratedIndex) { // ???? subscriberMethods = findUsingReflection(subscriberClass); } else { subscriberMethods = findUsingInfo(subscriberClass); } if (subscriberMethods.isEmpty()) { throw new EventBusException("Subscriber " + subscriberClass + " and its super classes have no public methods with the @Subscribe annotation"); } else { METHOD_CACHE.put(subscriberClass, subscriberMethods); return subscriberMethods; } } private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) { FindState findState = prepareFindState(); findState.initForSubscriber(subscriberClass); while (findState.clazz != null) { findUsingReflectionInSingleClass(findState); findState.moveToSuperclass(); } return getMethodsAndRelease(findState); } // ???? private void findUsingReflectionInSingleClass(FindState findState) { Method[] methods; try { // This is faster than getMethods, especially when subscribers are fat classes like Activities methods = findState.clazz.getDeclaredMethods(); } catch (Throwable th) { // Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149 methods = findState.clazz.getMethods(); findState.skipSuperClasses = true; } for (Method method : methods) { int modifiers = method.getModifiers(); if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) { Class<?>[] parameterTypes = method.getParameterTypes(); if (parameterTypes.length == 1) { Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class); if (subscribeAnnotation != null) { Class<?> eventType = parameterTypes[0]; if (findState.checkAdd(method, eventType)) { ThreadMode threadMode = subscribeAnnotation.threadMode(); // ???? findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode, subscribeAnnotation.priority(), subscribeAnnotation.sticky())); } } } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) { String methodName = method.getDeclaringClass().getName() + "." + method.getName(); throw new EventBusException("@Subscribe method " + methodName + "must have exactly 1 parameter but has " + parameterTypes.length); } } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) { String methodName = method.getDeclaringClass().getName() + "." + method.getName(); throw new EventBusException(methodName + " is a illegal @Subscribe method: must be public, non-static, and non-abstract"); } } } 复制代码
SubscriberMethod()
new SubscriberMethod(method, // 方法名 eventType, // 方法参数 threadMode, // 注解中的信息 subscribeAnnotation.priority(), subscribeAnnotation.sticky()) 复制代码
findUsingReflectionInSingleClass
去解析注册者对象的所有方法。并且拿到带有注解的方法。 判断含有注解的方法是否是public
,判断带有注解的方法的变量是否只有一个.然后通过annotation解析所有细节参数,包括ThreadMode,Priority,sticky,参数的类型。吧这些参数封装成一个对象SubscriberMethod
。
重点:
在获取完subscriberMethods之后,在register中进行遍历
synchronized (this) { for (SubscriberMethod subscriberMethod : subscriberMethods) { subscribe(subscriber, subscriberMethod); } } 复制代码
然后绑定关系
// Must be called in synchronized block private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) { Class<?> eventType = subscriberMethod.eventType; Subscription newSubscription = new Subscription(subscriber, subscriberMethod); // 将注册的类以及sbcribeMethod绑定 CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType); if (subscriptions == null) { subscriptions = new CopyOnWriteArrayList<>(); subscriptionsByEventType.put(eventType, subscriptions);// 将parmeter类型与subcriptions绑定 } else { if (subscriptions.contains(newSubscription)) { throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event " + eventType); } } int size = subscriptions.size(); for (int i = 0; i <= size; i++) { if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) { subscriptions.add(i, newSubscription); break; } } List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber); if (subscribedEvents == null) { subscribedEvents = new ArrayList<>(); typesBySubscriber.put(subscriber, subscribedEvents); } subscribedEvents.add(eventType); // ???? 被订阅的方法接收的参数是否是粘性的 if (subscriberMethod.sticky) { // ???? 是否开启类继承 if (eventInheritance) { // Existing sticky events of all subclasses of eventType have to be considered. // Note: Iterating over all events may be inefficient with lots of sticky events, // thus data structure should be changed to allow a more efficient lookup // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>). Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet(); for (Map.Entry<Class<?>, Object> entry : entries) { Class<?> candidateEventType = entry.getKey(); if (eventType.isAssignableFrom(candidateEventType)) { Object stickyEvent = entry.getValue(); checkPostStickyEventToSubscription(newSubscription, stickyEvent); } } } else { Object stickyEvent = stickyEvents.get(eventType); checkPostStickyEventToSubscription(newSubscription, stickyEvent); } } } private void checkPostStickyEventToSubscription(Subscription newSubscription, Object stickyEvent) { if (stickyEvent != null) { // If the subscriber is trying to abort the event, it will fail (event is not tracked in posting state) // --> Strange corner case, which we don't take care of here. postToSubscription(newSubscription, stickyEvent, isMainThread()); } } 复制代码
postToSubscription
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) { // 根据不同的threadMode 反射执行 invokeSubscriber switch (subscription.subscriberMethod.threadMode) { case POSTING: invokeSubscriber(subscription, event); break; case MAIN: if (isMainThread) { // ???? 1、如果已经处于UI线程,则直接调用 invokeSubscriber(subscription, event); } else { //???? 2、如果非UI线程,则调用mainThreadPoster处理 mainThreadPoster.enqueue(subscription, event); break; case MAIN_ORDERED: if (mainThreadPoster != null) { mainThreadPoster.enqueue(subscription, event); } else { // temporary: technically not correct as poster not decoupled from subscriber invokeSubscriber(subscription, event); } break; case BACKGROUND: if (isMainThread) { backgroundPoster.enqueue(subscription, event); } else { invokeSubscriber(subscription, event); } break; case ASYNC: asyncPoster.enqueue(subscription, event); break; default: throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode); } } void invokeSubscriber(Subscription subscription, Object event) { try { subscription.subscriberMethod.method.invoke(subscription.subscriber, event); } catch (InvocationTargetException e) { handleSubscriberException(subscription, event, e.getCause()); } catch (IllegalAccessException e) { throw new IllegalStateException("Unexpected exception", e); } } 复制代码
如果是posting,直接调用invokeSubscriber,如果是MAIN,则需要判断发送事件的线程是否是主线程,如果是,直接调用invokeSubscriber,如果不是,需要向mainThreadPoster中enqueue(),
mainThreadPoster实际上是一个handler
HandlerPoster
new HandlerPoster(eventBus, looper, 10) //???? 本身是一个Handler,也是一个Post接口的实现类 public class HandlerPoster extends Handler implements Poster { private final PendingPostQueue queue; private final int maxMillisInsideHandleMessage; private final EventBus eventBus; private boolean handlerActive; protected HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) { super(looper); this.eventBus = eventBus; this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage; queue = new PendingPostQueue(); } public void enqueue(Subscription subscription, Object event) { //将subscription和event构建成一个PendingPost对象 PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); synchronized (this) { //将pendingPost插入到事件链表的尾部 queue.enqueue(pendingPost); if (!handlerActive) { handlerActive = true; //发送消息,执行handleMessage if (!sendMessage(obtainMessage())) { throw new EventBusException("Could not send handler message"); } } } } @Override public void handleMessage(Message msg) { boolean rescheduled = false; try { //执行开始计时 long started = SystemClock.uptimeMillis(); while (true) { PendingPost pendingPost = queue.poll(); if (pendingPost == null) { synchronized (this) { // Check again, this time in synchronized pendingPost = queue.poll(); if (pendingPost == null) { handlerActive = false; return; } } } //进行事件处理 eventBus.invokeSubscriber(pendingPost); //如果在规定的时间内还没有处理完毕,则继续回调handleMessage处理 long timeInMethod = SystemClock.uptimeMillis() - started; if (timeInMethod >= maxMillisInsideHandleMessage) { if (!sendMessage(obtainMessage())) { throw new EventBusException("Could not send handler message"); } rescheduled = true; return; } } } finally { handlerActive = rescheduled; } } } 复制代码
可以看出handleMessage对事件链表的处理是不断循环获取链表中的PendingPost,当链表中的事件处理完毕后退出while循环。或者是在规定的时间内maxMillisInsideHandleMessage没有将事件处理完,则继续调用sendMessage。继而重新回调handleMessage方法,事件会继续执行。这是为了避免队列过长或者事件耗时较长时,while循环阻塞主线程造成卡顿。算是个小技巧。
BackgroundPoster
如果是在子线程中发送事件,则需要用到BackgroundPoster
final class BackgroundPoster implements Runnable, Poster { private final PendingPostQueue queue; private final EventBus eventBus; private volatile boolean executorRunning; BackgroundPoster(EventBus eventBus) { this.eventBus = eventBus; queue = new PendingPostQueue(); } public void enqueue(Subscription subscription, Object event) { PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); synchronized (this) { queue.enqueue(pendingPost); if (!executorRunning) { executorRunning = true; eventBus.getExecutorService().execute(this); } } } @Override public void run() { try { try { while (true) { PendingPost pendingPost = queue.poll(1000); if (pendingPost == null) { synchronized (this) { // Check again, this time in synchronized pendingPost = queue.poll(); if (pendingPost == null) { executorRunning = false; return; } } } eventBus.invokeSubscriber(pendingPost); } } catch (InterruptedException e) { eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e); } } finally { executorRunning = false; } } } 复制代码
post
postSingleEvent->postSingleEventForEventType->postToSubscription
poststicky
postSticky的逻辑简单明了,首先将事件存入stickyEvents里,然后调用普通事件的post方法,执行与普通post相同的逻辑。
读者看到这里,不知道会不会有疑问:
粘性事件的发布和普通事件的发布好像没什么区别,它是怎么做到粘性的效果呢? 在register的时候,有调用了checkPostStickyEventToSubscription方法,该方法和postSticky里的post方法最终都调用了postToSubscription,这样不就重复了吗? 读者请仔细观察,在postSticky方法里,是有把事件缓存到stickyEvents中的。postSticky里的post方法,是订阅者的订阅事件的动作发生在粘性事件发布之前,此时粘性事件相当于普通事件,所以直接调用post处理即可。但如果订阅者的订阅事件的动作发生在粘性事件发布之后呢?此时就轮到stickyEvents起作用了,在注册方法里,会取出所有未处理的缓存的粘性事件,发送到checkPostStickyEventToSubscription处理,这样不就达到粘性事件的处理效果了吗?
其实这种场景在实际开发中是很常见的。比如我们在某个非UI线程发布一个事件,在订阅该事件的订阅方法里去启动某个Activity,此时,由于事件发布和Activity启动不在同一个线程,且Activity启动的速度和事件发布的速度也不可知,就有可能出现两种情况(发布普通事件的场景):
Activity先起来,事件后发送,此时没有添加sticky=true的订阅方法可以处理到该事件 Activity后起来,事件先发送,此时没有添加sticky=true的订阅方法就处理不到该事件了 而粘性事件正好可以处理场景2的需求。
结束语 那么,有聪明的读者就会说了:那直接把应用中的所有事件都以粘性事件发布,这样不就可以高枕无忧了?
其实不然。首先,粘性事件是以Map缓存的,事件越多,越占内存;其次,每次注册执行register时,都会将所有缓存的粘性事件调用订阅方法处理,如果处理过的粘性事件没有清除,对于应用功能可能是有影响的。建议读者不要图省事而种下祸根。
重要
private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
private final Map<Object, List<Class<?>>> typesBySubscriber;
作者:AaronLing
链接:https://juejin.cn/post/7032921700719788068