网上优秀博客:EventBus 源码解析 2.x EventBus 源码解析 3.0
本文知识点解释:
getDeclaredMethod()获取的是类自身声明的所有方法,包含public、protected和private方法。getMethod ()获取的是类的所有共有方法,这就包括自身的所有public方法,和从基类继承的、从接口实现的所有public方法。所以eventbus在最新版本中有个优化举措,先使用getMethod,如果抛异常(android bug),再使用getDeclaredMethod,因为getDeclaredMethod非常耗时,特别像activity这种很大的类。分析源码正文中有详细解释。
查找方法时的修饰符过滤。这里eventbus不但过滤了源代码中可用的修饰符,还过滤了会在字节码中生成的修饰符
1
2
Modifier.ABSTRACT | Modifier.STATIC | BRIDGE | SYNTHETIC;
In newer class files , compilers may add methods . Those are called bridge or synthetic methods .
EventBus must ignore both. There modifiers are not public but defined in the Java class file format:
巧用isAssignableFrom方法。instanceof运算符只被用于对象引用变量,检查左边的被测试对象是不是右边类或接口的 实例化。如果被测对象是null值,则测试结果总是false。 形象地:自身实例或子类实例 instanceof 自身类 返回true 例:
1
2
String s=new String("javaisland" );
System.out.println(s instanceof String);
Class类的isInstance(Object obj)方法,obj是被测试的对象,如果obj是调用这个方法的class或接口 的实例,则返回true。这个方法是instanceof运算符的动态等价。 形象地:自身类.class.isInstance(自身实例或子类实例) 返回true 例:
1
2
String s=new String("javaisland" );
System.out.println(String.class.isInstance(s));
Class类的isAssignableFrom(Class cls)方法,如果调用这个方法的class或接口 与 参数cls表示的类或接口相同,或者是参数cls表示的类或接口的父类,则返回true。 形象地:自身类.class.isAssignableFrom(自身类或子类.class) 返回true 例:
1
2
System.out.println(ArrayList.class.isAssignableFrom(Object.class));
System.out.println(Object.class.isAssignableFrom(ArrayList.class));
http://stackoverflow.com/questions/3949260/java-class-isinstance-vs-class-isassignablefrom
多线程单例的实现
1
2
3
4
5
6
7
8
9
10
11
12
13
public static EventBus getDefault () {
if (defaultInstance == null ) {
synchronized (EventBus.class) {
if (defaultInstance == null ) {
defaultInstance = new EventBus();
}
}
}
return defaultInstance;
}
static volatile EventBus defaultInstance;
双重锁检查机制
使用建造者模式构造对象
多个EventBus对象事件互相隔离
// Don’t let the pool grow indefinitelyif (pendingPostPool.size() < 10000) { pendingPostPool.add(pendingPost); }
并发-Java中的Copy-On-Write容器
private final Map, CopyOnWriteArrayList> subscriptionsByEventType;该容器存放事件和Subscription的对应关系
eventType.cast(stickyEvents.get(eventType)); 强转使用Class类的方法,而不是直接用语法强转
开始了 基本流程
使用建造者模式初始化单例对象
注册监听者,并储存监听者里面的监听方法信息
派发事件给监听者
移除监听者
解读源码 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
EventBus(EventBusBuilder builder) {
subscriptionsByEventType = new HashMap<>();
typesBySubscriber = new HashMap<>();
stickyEvents = new ConcurrentHashMap<>();
mainThreadPoster = new HandlerPoster(this , Looper.getMainLooper(), 10 );
backgroundPoster = new BackgroundPoster(this );
asyncPoster = new AsyncPoster(this );
indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0 ;
subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
builder.strictMethodVerification, builder.ignoreGeneratedIndex);
logSubscriberExceptions = builder.logSubscriberExceptions;
logNoSubscriberMessages = builder.logNoSubscriberMessages;
sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
throwSubscriberException = builder.throwSubscriberException;
eventInheritance = builder.eventInheritance;
executorService = builder.executorService;
}
初始化方法,把一些List,map都初始化掉。可以看到储存stickyEvents粘性事件的map,是线程安全的。 也初始化了几个线程的派发器,如主线程调度用的HandlerPoster,后台线程的BackgroundPoster,HandlerPoster继承自Handler,使用mainLooper创建,可以保证其派发的方法都在主线程内,内部具体原理下面分析。
1
2
3
4
5
6
7
8
9
10
* Subscribers have event handling methods that must be annotated by {@link Subscribe}. * The {@link Subscribe} annotation also allows configuration like {@link * ThreadMode} and priority. */
public void register (Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this ) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
注册监听者方法,注释里说,监听者必须有监听方法(带Subscribe注解的方法)。 findSubscriberMethods顾名思义,查找监听方法,返回值也是一个SubscriberMethod的list,SubscriberMethod包装了监听方法的method对象,线程模式,事件类型(就是该方法的参数类型),优先级和是否为粘性事件。
1
2
3
4
5
6
7
8
9
10
SubscriberMethod类中使用checkMethodString方法来重写equals方法
private synchronized void checkMethodString () {
if (methodString == null ) {
builder.append(method.getDeclaringClass().getName());
builder.append('#' ).append(method.getName());
builder.append('(' ).append(eventType.getName());
methodString = builder.toString();
}
}
继续说SubscriberMethodFinder#findSubscriberMethods,SubscriberMethodFinder对象是在EventBus对象初始化的时候完成初始化的,使用了build里面的几个成员变量, boolean ignoreGeneratedIndex; boolean strictMethodVerification; List subscriberInfoIndexes; 默认都是false和空,暂不深入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
List<SubscriberMethod> findSubscriberMethods (Class<?> subscriberClass) {
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null ) {
return subscriberMethods;
}
if (ignoreGeneratedIndex) {
subscriberMethods = findUsingReflection(subscriberClass);
} else {
subscriberMethods = findUsingInfo(subscriberClass);
}
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation" );
} else {
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}
METHOD_CACHE是一个ConcurrentHashMap,缓存用 因为ignoreGeneratedIndex默认为false,所以我们先看findUsingInfo是怎么生成List的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private List<SubscriberMethod> findUsingInfo (Class<?> subscriberClass) {
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null ) {
findState.subscriberInfo = getSubscriberInfo(findState);
if (findState.subscriberInfo != null ) {
SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
for (SubscriberMethod subscriberMethod : array) {
if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
findState.subscriberMethods.add(subscriberMethod);
}
}
} else {
findUsingReflectionInSingleClass(findState);
}
findState.moveToSuperclass();
}
return getMethodsAndRelease(findState);
}
先获取了一个FindState对象,来看看这个FindState拿出和释放过程 prepareFindState,getMethodsAndRelease
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private List<SubscriberMethod> getMethodsAndRelease (FindState findState) {
List<SubscriberMethod> subscriberMethods = new ArrayList<>(findState.subscriberMethods);
findState.recycle();
synchronized (FIND_STATE_POOL) {
for (int i = 0 ; i < POOL_SIZE; i++) {
if (FIND_STATE_POOL[i] == null ) {
FIND_STATE_POOL[i] = findState;
break ;
}
}
}
return subscriberMethods;
}
private FindState prepareFindState () {
synchronized (FIND_STATE_POOL) {
for (int i = 0 ; i < POOL_SIZE; i++) {
FindState state = FIND_STATE_POOL[i];
if (state != null ) {
FIND_STATE_POOL[i] = null ;
return state;
}
}
}
return new FindState();
}
作者在EventBus项目中多次用到这种对象池的实现方法 实现的基本流程就是池中如果有该对象,就取出(是真正的取出,拿出对象后把池中的该位置置空),释放的时候把需要获取的最终数据取出,再释放FindState,然后放回的池中。 对象池可以减少中间对象FindState的创建,回收,以达到减少GC目的。 继续说findUsingInfo findState.initForSubscriber会把findstate的数据初始化一下 下面的while循环就真正的去查找并缓存监听方法了!! 第一次进去findState.subscriberInfo一定为空的,所以我们先看findUsingReflectionInSingleClass
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private void findUsingReflectionInSingleClass (FindState findState) {
Method[] methods;
try {
} catch (Throwable th) {
findState.skipSuperClasses = true ;
}
for (Method method : methods) {
int modifiers = method.getModifiers();
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0 ) {
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length == 1 ) {
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
if (subscribeAnnotation != null ) {
Class<?> eventType = parameterTypes[0 ];
if (findState.checkAdd(method, eventType)) {
ThreadMode threadMode = subscribeAnnotation.threadMode();
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException("@Subscribe method " + methodName +
"must have exactly 1 parameter but has " + parameterTypes.length);
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException(methodName +
" is a illegal @Subscribe method: must be public, non-static, and non-abstract" );
}
}
}
这个方法有点长,我们慢慢看 获取methods的地方有一个特殊处理,getDeclaredMethods方法不会获取父类的方法,所以会快很多但是偶尔会报出java.lang.NoClassDefFoundError这是一个android的bug 获取到所有method后,遍历method,只过滤出只有一个参数的公用方法,获取注解上的数据,然后生成SubscriberMehtod对象,并保存下来 strictMethodVerification是不是严苛验证,如果是严苛模式,带EvnetBus注解,并且不是有一个参数,或者不是公有方法,就抛异常。 里面有一个findState.checkAdd方法,来看一下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
boolean checkAdd (Method method, Class<?> eventType) {
if (existing == null ) {
return true ;
} else {
if (existing instanceof Method) {
if (!checkAddWithMethodSignature((Method) existing, eventType)) {
}
}
return checkAddWithMethodSignature(method, eventType);
}
}
跟我们平常存取map数据(没有就存,有不存)的模式不太一样。这种方法更快速!之前我们用的都是先查看map中有没有,有才存。 checkAdd使用了两个map来进行校验,anyMethodByEventType校验事件类型(参数)没有的话直接通过,有的话使用subscriberClassByMethodKey校验所属对象和方法签名 因为已经通过了验证,anyMethodByEventType.put(eventType, this)为的是清除一下老的数据,省去了else里面的抛异常验证流程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private boolean checkAddWithMethodSignature (Method method, Class<?> eventType) {
methodKeyBuilder.setLength(0 );
methodKeyBuilder.append(method.getName());
methodKeyBuilder.append('>' ).append(eventType.getName());
String methodKey = methodKeyBuilder.toString();
Class<?> methodClass = method.getDeclaringClass();
Class<?> methodClassOld = subscriberClassByMethodKey.put(methodKey, methodClass);
if (methodClassOld == null || methodClassOld.isAssignableFrom(methodClass)) {
} else {
return false ;
}
}
是否保存该方法的校验 使用方法名和参数名为key进行校验,如果第一次添加(methodClassOld == null),或者新方法所属的类是老方法所属类的子类,都允许添加(此处不太明白)。 从findUsingReflectionInSingleClass出来,再看findUsingInfo往下的流程,调用了findState.moveToSuperclass()方法,该方法会递归向上给findState力的clazz变量赋值,直到系统类时置空。clazz为空时才能跳出findUsingInfo里面的循环。 到此一个被注册的类的所有监听方法就被加载缓存了起来,在findState对象的subscriberMethods里面,返回给EvnetBus的register方法,还记得findSubscriberMethods后面的代码吗?
1
2
3
4
5
synchronized (this ) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
来看下subscribe方法,方法名为订阅,那就是订阅的意思咯,订阅完,整个register方法就结束了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
Class<?> eventType = subscriberMethod.eventType;
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null ) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event " + eventType);
}
}
int size = subscriptions.size();
for (int i = 0 ; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break ;
}
}
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null ) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);
if (subscriberMethod.sticky) {
if (eventInheritance) {
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}
Subscription为订阅该事件的对象和SubscriberMethod对象的包装类,subscriptionsByEventType对象则以事件为key,保存了所有含有该事件的Subscription对象
1
2
3
4
5
6
for (int i = 0 ; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break ;
}
}
一种插入排序写法,根据优先级插入到List当中,优先级大的插在前面,如果到最后优先级都不大于的话,插入最后。注意,会动态改变list大小的时候,size要先赋值再使用。 typesBySubscriber以对象为key,value是该对象订阅的事件类型 最后派发粘性事件,也就是粘性事件的实现原理:当一个订阅者新注册上来,查看有没有粘性事件,有就派发给它。 整个注册流程结束了,几个重要的list也都有了数据。下面看下最终要的-Post派发事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void post (Object event) {
PostingThreadState postingState = currentPostingThreadState.get();
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);
if (!postingState.isPosting) {
postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
postingState.isPosting = true ;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset" );
}
try {
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0 ), postingState);
}
} finally {
postingState.isPosting = false ;
postingState.isMainThread = false ;
}
}
}
ThreadLocal应该很熟悉了,currentPostingThreadState就是用ThreadLocal实现的,能保证每个线程取出的变量都是属于该线程的,内部实现原理是使用线程对象做的key,有兴趣可以看下ThreadLocal的实现。 它的应用地方有很多,比如主线程的Loop。服务器端做数据库事务的时候也有应用到它,具体场景为,服务器每处理一个请求为一个线程,一个请求过来,在收到请求的入口出取出一个数据库连接,打开事务,经过一些列业务处理后,给客户端返回数据的时候,取出该连接,关闭事务。 PostingThreadState为当前线程的发送状态,和需要发送的数据,和目标对象(订阅者),currentPostingThreadState新建的时候重写了ThreadLocal的initialValue方法,所以第一次取出对象会新建,而不是为空。 先把需要发送的事件存入当前线程的事件队列,查看是否正在发送,没有正在发送,继续往下走,正在发送的话,整个方法就结束了。不用担心,此时数据已经加入到了队列当中。 可以看到,第一次启动的时候,如果就取消,EventBus会抛异常的。 再来看postSingleEvent。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private void postSingleEvent (Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false ;
if (eventInheritance) {
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0 ; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
Log.d(TAG, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this , event));
}
}
}
可以看到,多次使用的eventInheritance变量,意思是要不要匹配继承关系,默认值为true。 lookupAllEventTypes会把该次事件对象的所有实现接口,父类,都缓存下来,并分别匹配 举例:派发事件类A,lookupAllEventTypes会把事件A的实现接口interfaceA,interfaceB,superA,superB,全部缓存。 下来就遍历缓存的这个list,去查找所有的订阅者的订阅事件,并派发出去。如果类B订阅了事件interfaceB,也可以收到该事件(看代码是这样的,未去实测) 如果没有找到订阅者,会发出一个NoSubscriberEvent事件,再给你一个接收的机会!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private boolean postSingleEventForEventType (Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this ) {
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false ;
try {
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null ;
postingState.subscription = null ;
postingState.canceled = false ;
}
if (aborted) {
break ;
}
}
return true ;
}
return false ;
}
进一步的分发方法,找到事件对应的所有订阅者,挨个派发,先上车(postingState这个是车),再使用postToSubscription方法派发,可以看到中途可以取消班车(但是你不知道取消班车的时候,班车走到哪了,所以取消操作不能控制细节)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void postToSubscription (Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break ;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break ;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break ;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break ;
default :
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
厉害了!!终于到这了,这里是真正的分发方法了!! Subscription中含有订阅者对象,和它订阅其中一个方法。在此时其实我们找到的是已经匹配好的,跟派发事件相同的订阅方法了。 一个一个来 先看
1
2
3
4
5
6
7
8
9
10
11
12
13
case POSTING:
invokeSubscriber(subscription, event);
break ;
里面调用了
void invokeSubscriber (Subscription subscription, Object event) {
try {
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception" , e);
}
}
直接反射,说明POSTING,使用的当前线程回调!
1
2
3
4
5
6
7
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break ;
如果当前是主线程,直接回调,如果不是,使用了mainThreadPoster(HandlerPoster)这个东西。前面提到过这个类,来看下它具体是怎么实现的 final class HandlerPoster extends Handler 继承了Handler mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10); 使用了主线程的Loop,这么做,它里面派发的事件,都会在主线程里面执行。再来看下它的队列是怎么维护的。
1
2
3
4
5
6
7
8
9
10
11
12
void enqueue (Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this ) {
queue.enqueue(pendingPost);
if (!handlerActive) {
handlerActive = true ;
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message" );
}
}
}
}
获取一个PendingPost,放进了queue当中,如果没在活动,就发送消息,消息发送失败抛异常。 先看PendingPost是个什么鬼,再看queue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
final class PendingPost {
private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();
Object event;
Subscription subscription;
PendingPost next;
private PendingPost (Object event, Subscription subscription) {
this .event = event;
this .subscription = subscription;
}
static PendingPost obtainPendingPost (Subscription subscription, Object event) {
synchronized (pendingPostPool) {
int size = pendingPostPool.size();
if (size > 0 ) {
PendingPost pendingPost = pendingPostPool.remove(size - 1 );
pendingPost.event = event;
pendingPost.subscription = subscription;
pendingPost.next = null ;
return pendingPost;
}
}
return new PendingPost(event, subscription);
}
static void releasePendingPost (PendingPost pendingPost) {
pendingPost.event = null ;
pendingPost.subscription = null ;
pendingPost.next = null ;
synchronized (pendingPostPool) {
pendingPostPool.add(pendingPost);
}
}
}
}
看源码可知,就是一个简单的队列。这里又用了对象池技术。值得注意的细节是,它控制了池大小,我们开发中容易忽略的地方。 queue是类PendingPostQueue的对象,这个类是一个连接器,有头有尾。看源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
final class PendingPostQueue {
private PendingPost head;
private PendingPost tail;
synchronized void enqueue (PendingPost pendingPost) {
if (pendingPost == null ) {
throw new NullPointerException("null cannot be enqueued" );
}
if (tail != null ) {
tail.next = pendingPost;
tail = pendingPost;
} else if (head == null ) {
head = tail = pendingPost;
} else {
throw new IllegalStateException("Head present, but no tail" );
}
notifyAll();
}
synchronized PendingPost poll () {
PendingPost pendingPost = head;
if (head != null ) {
head = head.next;
if (head == null ) {
tail = null ;
}
}
return pendingPost;
}
synchronized PendingPost poll (int maxMillisToWait) throws InterruptedException {
if (head == null ) {
wait(maxMillisToWait);
}
return poll();
}
}
有两个poll方法,第一个取头,第二个是待超时的。第一个不带超时的方法,是在主线程里面调用的。这里有个困惑,tail没有地方用到。 回看HandlerPoster,先调用了 queue.enqueue(pendingPost);queue.enqueue里面把头尾都赋了值,再去发送消息。这时候要看handleMessage里面是怎么取,怎么处理的了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Overridepublic void handleMessage (Message msg) {
boolean rescheduled = false ;
try {
long started = SystemClock.uptimeMillis();
while (true ) {
PendingPost pendingPost = queue.poll();
if (pendingPost == null ) {
synchronized (this ) {
if (pendingPost == null ) {
handlerActive = false ;
return ;
}
}
}
eventBus.invokeSubscriber(pendingPost);
long timeInMethod = SystemClock.uptimeMillis() - started;
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message" );
}
rescheduled = true ;
return ;
}
}
} finally {
handlerActive = rescheduled;
}
}
启动循环后,从queue中取出刚刚放进去的一个PendingPost,如果为空就在同步块中重新取,真的没取到就结束循环。 取到数据了,调用了EventBus里面的invokeSubscriber,参数为PendingPost
1
2
3
4
5
6
7
8
void invokeSubscriber (PendingPost pendingPost) {
Object event = pendingPost.event;
Subscription subscription = pendingPost.subscription;
PendingPost.releasePendingPost(pendingPost);
if (subscription.active) {
invokeSubscriber(subscription, event);
}
}
看注释我们知道了,在主线程回调的时候,要查看该订阅者是否存活,这对Activity,Fragment尤为重要。试想这样一种场景, 如果订阅者是Activity的话,生命周期已经走完了(onDestory了),有个事件已经在looper里面了(事件先进入了looper,activity后被回收),在回调时,没有判断该事件是否存活,这时,在订阅事件方法中,用到的所有view对象都是无效的。麻烦大了! POSTING为什么不用这个方法呢?因为是在主线程中发的消息,直接回调了,不存在进入looper的情况。 来吧,继续我们的handleMessage
1
2
3
4
5
6
7
8
long timeInMethod = SystemClock.uptimeMillis() - started;
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message" );
}
rescheduled = true ;
return ;
}
started是在while循环外取的。这个是什么意思呢?maxMillisInsideHandleMessage(初始化的值为10毫秒),如果本次执行时间超过10毫秒,重新发送循环消息来执行handlerMessage,并结束本次执行。 这点就很有意思了。意思是,在10毫秒内,尽量处理事件,能处理多少处理多少,一旦超过10毫秒,停止本次handlerMessage的执行。我们知道,这次handlerMessage其实是主线程中的一次looper事件的执行,作者让结束本次handlerMessage,再启用一个looper事件触发handlerMessage,可以极大的防止ANR,这就涉及到了ANR的原理。我们有时候,也可以学这种处理方式。如果非要在主线程执行的话,可以多个事件分开执行,不然一次looper的循环超过了固定的事件,就会ANR了。 主线程的派发过程分析完毕
1
2
3
4
5
6
7
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break ;
后台线程派发过程,如果是主线程,就让backgroundPoster去执行,不是主线程,直接回调执行。 看下BackgroundPoster的实现,它实现了Runnable接口,和HandlerPoster实现差不多。它的运行在EventBus里面的一个线程池中。执行过程中不同的地方就是,run方法里面它是使用了PendingPostQueue对象的带超时的poll方法来获取PendingPost对象。如果没有数据,1秒钟会被等待,这时queue.enqueue(pendingPost)会把该线程中断掉,重新执行一个runnable。这个就没有时间的限制了,会一直处理,把所有事件处理完。
1
2
3
case ASYNC:
asyncPoster.enqueue(subscription, event);
break ;
跟BACKGROUND不同的是,不管是不是在主线程发送的事件,都会让它在线程池中运行
至此,整个派发过程就真的真的结束了
移除订阅(监听)
1
2
3
4
5
6
7
8
9
10
public synchronized void unregister (Object subscriber) {
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
if (subscribedTypes != null ) {
for (Class<?> eventType : subscribedTypes) {
unsubscribeByEventType(subscriber, eventType);
}
typesBySubscriber.remove(subscriber);
} else {
Log.w(TAG, "Subscriber to unregister was not registered before: " + subscriber.getClass());
}
就是挨个移除。这里面可以说下unsubscribeByEventType里面的List的迭代删除方法。以后可以这么用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private void unsubscribeByEventType (Object subscriber, Class<?> eventType) {
List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions != null ) {
int size = subscriptions.size();
for (int i = 0 ; i < size; i++) {
Subscription subscription = subscriptions.get(i);
if (subscription.subscriber == subscriber) {
subscription.active = false ;
subscriptions.remove(i);
i--;
size--;
}
}
}
}
就是这样,i—-,size–,安全,快捷。 错误示范(我们的代码里就有人这么写!)
1
2
3
for (int i = 0 ; i < list.size(); i++) {
list.remove(i);
}
也可以这么搞,正确示范
1
2
3
for (int i = list.size() - 1 ; i >= 0 ; i--) {
list.remove(i);
}
至此EventBus这个订阅者模式的库源码就全部走完了,还有一些取消粘性事件,普通事件的方法,自行观看,很简单。
总结: 开头的知识点都是要注意的地方。 作者java功底深厚,各种类型list,map应用合理,线程同步块也做的恰到好处,做到尽量小的包含代码。 代码里面随处可见的对象池重用写法,也很好的解决了大量产生中间对象时的GC问题。还有一次looper执行的message执行拆分,也有益于ANR的减少。