Kahn's blogs

EventBus3.0源代码全解析

2017/09/14

网上优秀博客:
EventBus 源码解析 2.x
EventBus 源码解析 3.0

本文知识点解释:

  1. getDeclaredMethod()获取的是类自身声明的所有方法,包含public、protected和private方法。getMethod()获取的是类的所有共有方法,这就包括自身的所有public方法,和从基类继承的、从接口实现的所有public方法。所以eventbus在最新版本中有个优化举措,先使用getMethod,如果抛异常(android bug),再使用getDeclaredMethod,因为getDeclaredMethod非常耗时,特别像activity这种很大的类。分析源码正文中有详细解释。
  2. 查找方法时的修饰符过滤。这里eventbus不但过滤了源代码中可用的修饰符,还过滤了会在字节码中生成的修饰符

    1
    2
    Modifier.ABSTRACT | Modifier.STATIC | BRIDGE | SYNTHETIC;
    In newer class files, compilers may add methods. Those are called bridge or synthetic methods.

    EventBus must ignore both. There modifiers are not public but defined in the Java class file format:

  3. 巧用isAssignableFrom方法。instanceof运算符只被用于对象引用变量,检查左边的被测试对象是不是右边类或接口的 实例化。如果被测对象是null值,则测试结果总是false。
    形象地:自身实例或子类实例 instanceof 自身类 返回true
    例:

    1
    2
    String s=new String("javaisland"); 
    System.out.println(s instanceof String); //true

    Class类的isInstance(Object obj)方法,obj是被测试的对象,如果obj是调用这个方法的class或接口 的实例,则返回true。这个方法是instanceof运算符的动态等价。
    形象地:自身类.class.isInstance(自身实例或子类实例) 返回true
    例:

    1
    2
    String s=new String("javaisland"); 
    System.out.println(String.class.isInstance(s)); //true

    Class类的isAssignableFrom(Class cls)方法,如果调用这个方法的class或接口 与 参数cls表示的类或接口相同,或者是参数cls表示的类或接口的父类,则返回true。 形象地:自身类.class.isAssignableFrom(自身类或子类.class) 返回true
    例:

    1
    2
    System.out.println(ArrayList.class.isAssignableFrom(Object.class));  //false 
    System.out.println(Object.class.isAssignableFrom(ArrayList.class)); //true

    http://stackoverflow.com/questions/3949260/java-class-isinstance-vs-class-isassignablefrom

  4. 多线程单例的实现

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    /** Convenience singleton for apps using a process-wide EventBus instance. */
    public static EventBus getDefault() {
    if (defaultInstance == null) {
    synchronized (EventBus.class) {
    if (defaultInstance == null) {
    defaultInstance = new EventBus();
    }
    }
    }
    return defaultInstance;
    }
    static volatile EventBus defaultInstance;
    双重锁检查机制
  5. 使用建造者模式构造对象

  6. 多个EventBus对象事件互相隔离
  7. // Don’t let the pool grow indefinitelyif (pendingPostPool.size() < 10000) {
    pendingPostPool.add(pendingPost);
    }
  8. 并发-Java中的Copy-On-Write容器
    1. private final Map, CopyOnWriteArrayList> subscriptionsByEventType;该容器存放事件和Subscription的对应关系
  9. eventType.cast(stickyEvents.get(eventType));
    强转使用Class类的方法,而不是直接用语法强转

开始了

基本流程

  1. 使用建造者模式初始化单例对象
  2. 注册监听者,并储存监听者里面的监听方法信息
  3. 派发事件给监听者
  4. 移除监听者

解读源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
EventBus(EventBusBuilder builder) {
subscriptionsByEventType = new HashMap<>();
typesBySubscriber = new HashMap<>();
stickyEvents = new ConcurrentHashMap<>();
mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10);
backgroundPoster = new BackgroundPoster(this);
asyncPoster = new AsyncPoster(this);
indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
builder.strictMethodVerification, builder.ignoreGeneratedIndex);
logSubscriberExceptions = builder.logSubscriberExceptions;
logNoSubscriberMessages = builder.logNoSubscriberMessages;
sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
throwSubscriberException = builder.throwSubscriberException;
eventInheritance = builder.eventInheritance;
executorService = builder.executorService;
}

初始化方法,把一些List,map都初始化掉。可以看到储存stickyEvents粘性事件的map,是线程安全的。
也初始化了几个线程的派发器,如主线程调度用的HandlerPoster,后台线程的BackgroundPoster,HandlerPoster继承自Handler,使用mainLooper创建,可以保证其派发的方法都在主线程内,内部具体原理下面分析。

1
2
3
4
5
6
7
8
9
10
/** * Registers the given subscriber to receive events. Subscribers must call {@link #unregister(Object)} once they * are no longer interested in receiving events. * <p/>
* Subscribers have event handling methods that must be annotated by {@link Subscribe}. * The {@link Subscribe} annotation also allows configuration like {@link * ThreadMode} and priority. */
public void register(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}

注册监听者方法,注释里说,监听者必须有监听方法(带Subscribe注解的方法)。
findSubscriberMethods顾名思义,查找监听方法,返回值也是一个SubscriberMethod的list,SubscriberMethod包装了监听方法的method对象,线程模式,事件类型(就是该方法的参数类型),优先级和是否为粘性事件。

1
2
3
4
5
6
7
8
9
10
SubscriberMethod类中使用checkMethodString方法来重写equals方法
private synchronized void checkMethodString() {
if (methodString == null) {
// Method.toString has more overhead, just take relevant parts of the method StringBuilder builder = new StringBuilder(64);
builder.append(method.getDeclaringClass().getName());
builder.append('#').append(method.getName());
builder.append('(').append(eventType.getName());
methodString = builder.toString();
}
}

继续说SubscriberMethodFinder#findSubscriberMethods,SubscriberMethodFinder对象是在EventBus对象初始化的时候完成初始化的,使用了build里面的几个成员变量,
boolean ignoreGeneratedIndex;
boolean strictMethodVerification;
List subscriberInfoIndexes;
默认都是false和空,暂不深入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
return subscriberMethods;
}

if (ignoreGeneratedIndex) {
subscriberMethods = findUsingReflection(subscriberClass);
} else {
subscriberMethods = findUsingInfo(subscriberClass);
}
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}

METHOD_CACHE是一个ConcurrentHashMap,缓存用
因为ignoreGeneratedIndex默认为false,所以我们先看findUsingInfo是怎么生成List

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
findState.subscriberInfo = getSubscriberInfo(findState);
if (findState.subscriberInfo != null) {
SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
for (SubscriberMethod subscriberMethod : array) {
if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
findState.subscriberMethods.add(subscriberMethod);
}
}
} else {
findUsingReflectionInSingleClass(findState);
}
findState.moveToSuperclass();
}
return getMethodsAndRelease(findState);
}

先获取了一个FindState对象,来看看这个FindState拿出和释放过程
prepareFindState,getMethodsAndRelease

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private List<SubscriberMethod> getMethodsAndRelease(FindState findState) {
List<SubscriberMethod> subscriberMethods = new ArrayList<>(findState.subscriberMethods);
findState.recycle();
synchronized (FIND_STATE_POOL) {
for (int i = 0; i < POOL_SIZE; i++) {
if (FIND_STATE_POOL[i] == null) {
FIND_STATE_POOL[i] = findState;
break;
}
}
}
return subscriberMethods;
}

private FindState prepareFindState() {
synchronized (FIND_STATE_POOL) {
for (int i = 0; i < POOL_SIZE; i++) {
FindState state = FIND_STATE_POOL[i];
if (state != null) {
FIND_STATE_POOL[i] = null;
return state;
}
}
}
return new FindState();
}

作者在EventBus项目中多次用到这种对象池的实现方法
实现的基本流程就是池中如果有该对象,就取出(是真正的取出,拿出对象后把池中的该位置置空),释放的时候把需要获取的最终数据取出,再释放FindState,然后放回的池中。
对象池可以减少中间对象FindState的创建,回收,以达到减少GC目的。
继续说findUsingInfo
findState.initForSubscriber会把findstate的数据初始化一下
下面的while循环就真正的去查找并缓存监听方法了!!
第一次进去findState.subscriberInfo一定为空的,所以我们先看findUsingReflectionInSingleClass

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
// This is faster than getMethods, especially when subscribers are fat classes like Activities methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
// Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149 methods = findState.clazz.getMethods();
findState.skipSuperClasses = true;
}
for (Method method : methods) {
int modifiers = method.getModifiers();
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length == 1) {
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
if (subscribeAnnotation != null) {
Class<?> eventType = parameterTypes[0];
if (findState.checkAdd(method, eventType)) {
ThreadMode threadMode = subscribeAnnotation.threadMode();
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException("@Subscribe method " + methodName +
"must have exactly 1 parameter but has " + parameterTypes.length);
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException(methodName +
" is a illegal @Subscribe method: must be public, non-static, and non-abstract");
}
}
}

这个方法有点长,我们慢慢看
获取methods的地方有一个特殊处理,getDeclaredMethods方法不会获取父类的方法,所以会快很多但是偶尔会报出java.lang.NoClassDefFoundError这是一个android的bug
获取到所有method后,遍历method,只过滤出只有一个参数的公用方法,获取注解上的数据,然后生成SubscriberMehtod对象,并保存下来
strictMethodVerification是不是严苛验证,如果是严苛模式,带EvnetBus注解,并且不是有一个参数,或者不是公有方法,就抛异常。
里面有一个findState.checkAdd方法,来看一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
boolean checkAdd(Method method, Class<?> eventType) {
// 2 level check: 1st level with event type only (fast), 2nd level with complete signature when required. // Usually a subscriber doesn't have methods listening to the same event type. Object existing = anyMethodByEventType.put(eventType, method);
if (existing == null) {
return true;
} else {
if (existing instanceof Method) {
if (!checkAddWithMethodSignature((Method) existing, eventType)) {
// Paranoia check throw new IllegalStateException();
}
// Put any non-Method object to "consume" the existing Method anyMethodByEventType.put(eventType, this);
}
return checkAddWithMethodSignature(method, eventType);
}
}

跟我们平常存取map数据(没有就存,有不存)的模式不太一样。这种方法更快速!之前我们用的都是先查看map中有没有,有才存。
checkAdd使用了两个map来进行校验,anyMethodByEventType校验事件类型(参数)没有的话直接通过,有的话使用subscriberClassByMethodKey校验所属对象和方法签名
因为已经通过了验证,anyMethodByEventType.put(eventType, this)为的是清除一下老的数据,省去了else里面的抛异常验证流程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private boolean checkAddWithMethodSignature(Method method, Class<?> eventType) {
methodKeyBuilder.setLength(0);
methodKeyBuilder.append(method.getName());
methodKeyBuilder.append('>').append(eventType.getName());

String methodKey = methodKeyBuilder.toString();
Class<?> methodClass = method.getDeclaringClass();
Class<?> methodClassOld = subscriberClassByMethodKey.put(methodKey, methodClass);
if (methodClassOld == null || methodClassOld.isAssignableFrom(methodClass)) {
// Only add if not already found in a sub class return true;
} else {
// Revert the put, old class is further down the class hierarchy subscriberClassByMethodKey.put(methodKey, methodClassOld);
return false;
}
}

是否保存该方法的校验
使用方法名和参数名为key进行校验,如果第一次添加(methodClassOld == null),或者新方法所属的类是老方法所属类的子类,都允许添加(此处不太明白)。
从findUsingReflectionInSingleClass出来,再看findUsingInfo往下的流程,调用了findState.moveToSuperclass()方法,该方法会递归向上给findState力的clazz变量赋值,直到系统类时置空。clazz为空时才能跳出findUsingInfo里面的循环。
到此一个被注册的类的所有监听方法就被加载缓存了起来,在findState对象的subscriberMethods里面,返回给EvnetBus的register方法,还记得findSubscriberMethods后面的代码吗?

1
2
3
4
5
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}

来看下subscribe方法,方法名为订阅,那就是订阅的意思咯,订阅完,整个register方法就结束了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// Must be called in synchronized blockprivate void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
Class<?> eventType = subscriberMethod.eventType;
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event " + eventType);
}
}

int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}

List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);

if (subscriberMethod.sticky) {
if (eventInheritance) {
// Existing sticky events of all subclasses of eventType have to be considered. // Note: Iterating over all events may be inefficient with lots of sticky events, // thus data structure should be changed to allow a more efficient lookup // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>). Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}

Subscription为订阅该事件的对象和SubscriberMethod对象的包装类,subscriptionsByEventType对象则以事件为key,保存了所有含有该事件的Subscription对象

1
2
3
4
5
6
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}

一种插入排序写法,根据优先级插入到List当中,优先级大的插在前面,如果到最后优先级都不大于的话,插入最后。注意,会动态改变list大小的时候,size要先赋值再使用。
typesBySubscriber以对象为key,value是该对象订阅的事件类型
最后派发粘性事件,也就是粘性事件的实现原理:当一个订阅者新注册上来,查看有没有粘性事件,有就派发给它。
整个注册流程结束了,几个重要的list也都有了数据。下面看下最终要的-Post派发事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/** Posts the given event to the event bus. */public void post(Object event) {
PostingThreadState postingState = currentPostingThreadState.get();
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);

if (!postingState.isPosting) {
postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}

ThreadLocal应该很熟悉了,currentPostingThreadState就是用ThreadLocal实现的,能保证每个线程取出的变量都是属于该线程的,内部实现原理是使用线程对象做的key,有兴趣可以看下ThreadLocal的实现。
它的应用地方有很多,比如主线程的Loop。服务器端做数据库事务的时候也有应用到它,具体场景为,服务器每处理一个请求为一个线程,一个请求过来,在收到请求的入口出取出一个数据库连接,打开事务,经过一些列业务处理后,给客户端返回数据的时候,取出该连接,关闭事务。
PostingThreadState为当前线程的发送状态,和需要发送的数据,和目标对象(订阅者),currentPostingThreadState新建的时候重写了ThreadLocal的initialValue方法,所以第一次取出对象会新建,而不是为空。
先把需要发送的事件存入当前线程的事件队列,查看是否正在发送,没有正在发送,继续往下走,正在发送的话,整个方法就结束了。不用担心,此时数据已经加入到了队列当中。
可以看到,第一次启动的时候,如果就取消,EventBus会抛异常的。
再来看postSingleEvent。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
if (eventInheritance) {
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
Log.d(TAG, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}

可以看到,多次使用的eventInheritance变量,意思是要不要匹配继承关系,默认值为true。
lookupAllEventTypes会把该次事件对象的所有实现接口,父类,都缓存下来,并分别匹配
举例:派发事件类A,lookupAllEventTypes会把事件A的实现接口interfaceA,interfaceB,superA,superB,全部缓存。
下来就遍历缓存的这个list,去查找所有的订阅者的订阅事件,并派发出去。如果类B订阅了事件interfaceB,也可以收到该事件(看代码是这样的,未去实测)
如果没有找到订阅者,会发出一个NoSubscriberEvent事件,再给你一个接收的机会!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}

进一步的分发方法,找到事件对应的所有订阅者,挨个派发,先上车(postingState这个是车),再使用postToSubscription方法派发,可以看到中途可以取消班车(但是你不知道取消班车的时候,班车走到哪了,所以取消操作不能控制细节)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}

厉害了!!终于到这了,这里是真正的分发方法了!!
Subscription中含有订阅者对象,和它订阅其中一个方法。在此时其实我们找到的是已经匹配好的,跟派发事件相同的订阅方法了。
一个一个来
先看

1
2
3
4
5
6
7
8
9
10
11
12
13
case POSTING:
invokeSubscriber(subscription, event);
break;
里面调用了
void invokeSubscriber(Subscription subscription, Object event) {
try {
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}

直接反射,说明POSTING,使用的当前线程回调!

1
2
3
4
5
6
7
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;

如果当前是主线程,直接回调,如果不是,使用了mainThreadPoster(HandlerPoster)这个东西。前面提到过这个类,来看下它具体是怎么实现的
final class HandlerPoster extends Handler
继承了Handler
mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10);
使用了主线程的Loop,这么做,它里面派发的事件,都会在主线程里面执行。再来看下它的队列是怎么维护的。

1
2
3
4
5
6
7
8
9
10
11
12
void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
if (!handlerActive) {
handlerActive = true;
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}

获取一个PendingPost,放进了queue当中,如果没在活动,就发送消息,消息发送失败抛异常。
先看PendingPost是个什么鬼,再看queue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
final class PendingPost {
private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();

Object event;
Subscription subscription;
PendingPost next;

private PendingPost(Object event, Subscription subscription) {
this.event = event;
this.subscription = subscription;
}

static PendingPost obtainPendingPost(Subscription subscription, Object event) {
synchronized (pendingPostPool) {
int size = pendingPostPool.size();
if (size > 0) {
PendingPost pendingPost = pendingPostPool.remove(size - 1);
pendingPost.event = event;
pendingPost.subscription = subscription;
pendingPost.next = null;
return pendingPost;
}
}
return new PendingPost(event, subscription);
}

static void releasePendingPost(PendingPost pendingPost) {
pendingPost.event = null;
pendingPost.subscription = null;
pendingPost.next = null;
synchronized (pendingPostPool) {
// Don't let the pool grow indefinitely if (pendingPostPool.size() < 10000) {
pendingPostPool.add(pendingPost);
}
}
}

}

看源码可知,就是一个简单的队列。这里又用了对象池技术。值得注意的细节是,它控制了池大小,我们开发中容易忽略的地方。
queue是类PendingPostQueue的对象,这个类是一个连接器,有头有尾。看源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
final class PendingPostQueue {
private PendingPost head;
private PendingPost tail;

synchronized void enqueue(PendingPost pendingPost) {
if (pendingPost == null) {
throw new NullPointerException("null cannot be enqueued");
}
if (tail != null) {
tail.next = pendingPost;
tail = pendingPost;
} else if (head == null) {
head = tail = pendingPost;
} else {
throw new IllegalStateException("Head present, but no tail");
}
notifyAll();
}

synchronized PendingPost poll() {
PendingPost pendingPost = head;
if (head != null) {
head = head.next;
if (head == null) {
tail = null;
}
}
return pendingPost;
}

synchronized PendingPost poll(int maxMillisToWait) throws InterruptedException {
if (head == null) {
wait(maxMillisToWait);
}
return poll();
}

}

有两个poll方法,第一个取头,第二个是待超时的。第一个不带超时的方法,是在主线程里面调用的。这里有个困惑,tail没有地方用到。
回看HandlerPoster,先调用了
queue.enqueue(pendingPost);queue.enqueue里面把头尾都赋了值,再去发送消息。这时候要看handleMessage里面是怎么取,怎么处理的了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Overridepublic void handleMessage(Message msg) {
boolean rescheduled = false;
try {
long started = SystemClock.uptimeMillis();
while (true) {
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized pendingPost = queue.poll();
if (pendingPost == null) {
handlerActive = false;
return;
}
}
}
eventBus.invokeSubscriber(pendingPost);
long timeInMethod = SystemClock.uptimeMillis() - started;
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
handlerActive = rescheduled;
}
}

启动循环后,从queue中取出刚刚放进去的一个PendingPost,如果为空就在同步块中重新取,真的没取到就结束循环。
取到数据了,调用了EventBus里面的invokeSubscriber,参数为PendingPost

1
2
3
4
5
6
7
8
/** * Invokes the subscriber if the subscriptions is still active. Skipping subscriptions prevents race conditions * between {@link #unregister(Object)} and event delivery. Otherwise the event might be delivered after the * subscriber unregistered. This is particularly important for main thread delivery and registrations bound to the * live cycle of an Activity or Fragment. */void invokeSubscriber(PendingPost pendingPost) {
Object event = pendingPost.event;
Subscription subscription = pendingPost.subscription;
PendingPost.releasePendingPost(pendingPost);
if (subscription.active) {
invokeSubscriber(subscription, event);
}
}

看注释我们知道了,在主线程回调的时候,要查看该订阅者是否存活,这对Activity,Fragment尤为重要。试想这样一种场景,
如果订阅者是Activity的话,生命周期已经走完了(onDestory了),有个事件已经在looper里面了(事件先进入了looper,activity后被回收),在回调时,没有判断该事件是否存活,这时,在订阅事件方法中,用到的所有view对象都是无效的。麻烦大了!
POSTING为什么不用这个方法呢?因为是在主线程中发的消息,直接回调了,不存在进入looper的情况。
来吧,继续我们的handleMessage

1
2
3
4
5
6
7
8
long timeInMethod = SystemClock.uptimeMillis() - started;
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}

started是在while循环外取的。这个是什么意思呢?maxMillisInsideHandleMessage(初始化的值为10毫秒),如果本次执行时间超过10毫秒,重新发送循环消息来执行handlerMessage,并结束本次执行。
这点就很有意思了。意思是,在10毫秒内,尽量处理事件,能处理多少处理多少,一旦超过10毫秒,停止本次handlerMessage的执行。我们知道,这次handlerMessage其实是主线程中的一次looper事件的执行,作者让结束本次handlerMessage,再启用一个looper事件触发handlerMessage,可以极大的防止ANR,这就涉及到了ANR的原理。我们有时候,也可以学这种处理方式。如果非要在主线程执行的话,可以多个事件分开执行,不然一次looper的循环超过了固定的事件,就会ANR了。
主线程的派发过程分析完毕

1
2
3
4
5
6
7
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;

后台线程派发过程,如果是主线程,就让backgroundPoster去执行,不是主线程,直接回调执行。
看下BackgroundPoster的实现,它实现了Runnable接口,和HandlerPoster实现差不多。它的运行在EventBus里面的一个线程池中。执行过程中不同的地方就是,run方法里面它是使用了PendingPostQueue对象的带超时的poll方法来获取PendingPost对象。如果没有数据,1秒钟会被等待,这时queue.enqueue(pendingPost)会把该线程中断掉,重新执行一个runnable。这个就没有时间的限制了,会一直处理,把所有事件处理完。

1
2
3
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;

跟BACKGROUND不同的是,不管是不是在主线程发送的事件,都会让它在线程池中运行

至此,整个派发过程就真的真的结束了

移除订阅(监听)

1
2
3
4
5
6
7
8
9
10
/** Unregisters the given subscriber from all event classes. */public synchronized void unregister(Object subscriber) {
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
if (subscribedTypes != null) {
for (Class<?> eventType : subscribedTypes) {
unsubscribeByEventType(subscriber, eventType);
}
typesBySubscriber.remove(subscriber);
} else {
Log.w(TAG, "Subscriber to unregister was not registered before: " + subscriber.getClass());
}

就是挨个移除。这里面可以说下unsubscribeByEventType里面的List的迭代删除方法。以后可以这么用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/** Only updates subscriptionsByEventType, not typesBySubscriber! Caller must update typesBySubscriber. */private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions != null) {
int size = subscriptions.size();
for (int i = 0; i < size; i++) {
Subscription subscription = subscriptions.get(i);
if (subscription.subscriber == subscriber) {
subscription.active = false;
subscriptions.remove(i);
i--;
size--;
}
}
}
}

就是这样,i—-,size–,安全,快捷。
错误示范(我们的代码里就有人这么写!)

1
2
3
 for (int i = 0; i < list.size(); i++) {
list.remove(i);
}

也可以这么搞,正确示范

1
2
3
for (int i = list.size() - 1; i >= 0; i--) {
list.remove(i);
}

至此EventBus这个订阅者模式的库源码就全部走完了,还有一些取消粘性事件,普通事件的方法,自行观看,很简单。

总结:

开头的知识点都是要注意的地方。
作者java功底深厚,各种类型list,map应用合理,线程同步块也做的恰到好处,做到尽量小的包含代码。
代码里面随处可见的对象池重用写法,也很好的解决了大量产生中间对象时的GC问题。还有一次looper执行的message执行拆分,也有益于ANR的减少。