文章列表:
- EventBus源码剖析(1) – 订阅注册与注销
- EventBus源码剖析(2) – EventBusBuilder
- EventBus源码剖析(3) – 线程模式
- EventBus源码剖析(4) – 订阅记录
- EventBus源码剖析(5) – Poster
一、简介
1.1 特性
EventBus 是为 Android 而设的 publish/subscribe (发布/订阅) 消息系统。事件通过 post(Object) 提交到总线,总线把事件投递给订阅者。而该订阅者,需包含匹配类型消息的处理方法。
为了能接收事件,订阅者需通过 register(Object) 把自己注册到事件总线上。一旦成功注册,订阅者可以持续接收关心的事件,直至通过 unregister(Object) 结束订阅。
处理事件的方法需满足以下条件:
- 用 @Subscribe 进行注解;
- 方法可见性为 public;
- 方法返回值为 void;
- 仅含有一个参数,且形参为事件类;
由于把 Subscription 翻译为名词性的“订阅”后,和字面动词性“订阅”没法区分。所以本系列文章把该词翻译为更贴切的名词:“订阅记录”,这个词会将在后续文章继续沿用。
1.2 优点
- 简化不同组件间通讯
- 对事件发送者和接收者进行解耦
- 与Activities、 Fragments、和后台线程运行良好
- 避免复杂、易错的依赖和生命周期问题
- 令实现代码更简洁
- 运行速度快
- 库体积小 (约50KB)
- 已经过累计 100,000,000+ 安装量的应用验证
- 包含消息分发线程、订阅者优先级等高级特性
官方文档没有提及,EventBus 在进程(VM)创建单例,所有消息送到同进程的消息中心,由消息中心分发给同进程的实例。运行在不同进程的页面,没法收到来自其他进程的消息。要实现跨进程消息分发,必须使用 Socket、Binder 等方法。
1.3 版本
EventBus 自17年年底开始,基本停止代码提交,可认为 EventBus 功能稳定无变动。处于这种状况的开源库非常合适进行源码剖析,而本篇文章基于当时最新正式版 3.1.1 开展。
二、用法
2.1 订阅者
订阅者要在适当的生命周期时,把自己注册到消息总线。由于事件的基本接收单位是方法,所以需要给接收事件的方法添加注解,以便 EventBus 通过注解发现该方法。
接收者方法需要遵循以下规则:
- 使用 EventBus 的注解修饰方法;
- 方法不能为 private,才能让 EventBus 获取该方法;
- 方法必须只有一个参数,且参数类型就是所关心事件的类型;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class MainActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
EventBus.getDefault().register(this)
}
// 注册类必须包含至少一个接收事件的方法
@Subscribe(threadMode = ThreadMode.MAIN, sticky = false, priority = 0)
fun onEventReceived(event: UserEvent) {
val name = event.name
val age = event.age
Log.i(TAG, "Event received, Name: $name, Age: $age.")
}
override fun onDestroy() {
super.onDestroy()
EventBus.getDefault().unregister(this)
}
}
每个接收者类实例只需向 EventBus 注册一次。当接收者类对事件不再关心时,也需要在合适时间点注销订阅。
2.2 发布者
对事件发布者来说事情就简单多了,构建目标事件,把数据或负载内容封装在事件内发出即可。
1
2
3
4
fun postEvent() {
val user = UserEvent("Mike", 24)
EventBus.getDefault().post(user)
}
2.3 事件消息体
这是示例的消息体,消息体包含用户的名字和年龄
1
class UserEvent(val name: String, val age: Int)
如果消息只是简单通知,事件类甚至可以不含任何数据成员。例如:
1
class Notification
三、初始化
3.1 单例
整个 EventBus 通过以下方法创建单例。所有在此单例发送的事件,只对注册在单例里的订阅者有效。为保证所有事件能在同一个 EventBus 内流动,一定要从此方法获取 EventBus 实例。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class EventBus {
// 变量使用volatile修饰
static volatile EventBus defaultInstance;
// 同一进程内有效,双重检验锁实现单例
public static EventBus getDefault() {
if (defaultInstance == null) {
synchronized (EventBus.class) {
if (defaultInstance == null) {
defaultInstance = new EventBus();
}
}
}
return defaultInstance;
}
}
调用 getDefault() 方法时,同类两个常量也获得初始化。
1
private static final EventBusBuilder DEFAULT_BUILDER = new EventBusBuilder();
事件类型缓存,实例为 HashMap<Class<?>, List<Class<?>>>
1
private static final Map<Class<?>, List<Class<?>>> eventTypesCache = new HashMap<>();
3.2 基础构造
单例的初始化调用此构造方法,然后方法内又调用另一个构造方法:
1
2
3
public EventBus() {
this(DEFAULT_BUILDER);
}
3.3 深入构造
构造过程对以下数据成员进行赋值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// 按照事件类型分类订阅,事件类型与对应的订阅记录
private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
// 按照订阅者类型分类
private final Map<Object, List<Class<?>>> typesBySubscriber;
// 粘性事件
private final Map<Class<?>, Object> stickyEvents;
// 用于提供访问主线程的能力
private final MainThreadSupport mainThreadSupport;
// 主线程消息分发器
private final Poster mainThreadPoster;
// 后台线程消息分发器
private final BackgroundPoster backgroundPoster;
// 异步消息发布器
private final AsyncPoster asyncPoster;
// 订阅者方法查找器
private final SubscriberMethodFinder subscriberMethodFinder;
// ExecutorService
private final ExecutorService executorService;
private final boolean throwSubscriberException;
private final boolean logSubscriberExceptions;
private final boolean logNoSubscriberMessages;
private final boolean sendSubscriberExceptionEvent;
private final boolean sendNoSubscriberEvent;
private final boolean eventInheritance;
// 索引总计
private final int indexCount;
// 日志类
private final Logger logger;
构造方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
EventBus(EventBusBuilder builder) {
// 从EventBusBuilder获取Logger
logger = builder.getLogger();
// 初始化容器
subscriptionsByEventType = new HashMap<>();
typesBySubscriber = new HashMap<>();
stickyEvents = new ConcurrentHashMap<>();
// 从EventBusBuilder获取MainThreadSupport
mainThreadSupport = builder.getMainThreadSupport();
mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
// 构建BackgroundPoster
backgroundPoster = new BackgroundPoster(this);
// 构建AsyncPoster
asyncPoster = new AsyncPoster(this);
// 注解处理器所生成订阅者信息的数量
indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
// 初始化订阅者方法查找器初始化
subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
builder.strictMethodVerification, builder.ignoreGeneratedIndex);
// 初始化各种标志位
logSubscriberExceptions = builder.logSubscriberExceptions;
logNoSubscriberMessages = builder.logNoSubscriberMessages;
sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
throwSubscriberException = builder.throwSubscriberException;
eventInheritance = builder.eventInheritance;
// ExecutorService
executorService = builder.executorService;
}
3.4 PostingThreadState
单例初始化过程还初始化了 ThreadLocal 实例
1
2
3
4
5
6
private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
@Override
protected PostingThreadState initialValue() {
return new PostingThreadState();
}
};
PostingThreadState 是变量的封装,放在 ThreadLocal 中,以便快速设置、获取当前线程状态的多个变量。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
final static class PostingThreadState {
// 当前线程的事件队列
final List<Object> eventQueue = new ArrayList<>();
// 消息是否在投递中
boolean isPosting;
// 是否为主线程
boolean isMainThread;
// 订阅记录
Subscription subscription;
// 事件
Object event;
// 是否已被取消投递
boolean canceled;
}
四、注册事件
4.1 register
所有订阅者通过此方法向 EventBus 注册,以便在注册后收取所关心的事件。注销订阅则通过方法 unregister(Object),这样观察者可在不关心事件的时候取消订阅。
从下面的实现大概可知,如果订阅者在生命周期结束后不移除订阅者,页面会出现内存泄漏。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void register(Object subscriber) {
// 获取订阅者Class
Class<?> subscriberClass = subscriber.getClass();
// 根据订阅者类型获取接收事件的方法
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
// 逐个注册订阅者中的订阅方法
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}
4.2 subscribe
此方法在同步块中调用,把订阅方法按照订阅事件分类。过程会检查订阅者是否出现重复注册问题,后根据设置决定是否分发粘性事件。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
// 订阅者方法的事件类型,即订阅者方法唯一参数
Class<?> eventType = subscriberMethod.eventType;
// 构建新记录
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
// 用事件类型获取所有订阅记录列表
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
// 订阅记录列表为空,进行初始化
subscriptions = new CopyOnWriteArrayList<>();
// 保存事件类型和对应订阅记录
subscriptionsByEventType.put(eventType, subscriptions);
} else {
// 检查该订阅者是否出现重复注册
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}
// 获取同一事件的订阅记录数量
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
// 根据方法注解设置的priority值降序排序,并插入新记录事件记录
// priority值越大越优先接收消息,默认值为0
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
// 通过订阅者类型获取,该订阅者已订阅事件类型
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
// 如果订阅者首次注册则该列表为空,并存入刚订阅的事件
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
// 同一订阅者订阅的事件
typesBySubscriber.put(subscriber, subscribedEvents);
}
// 向订阅者订阅事件列表增加新事件类型
subscribedEvents.add(eventType);
// 订阅方法的sticky为true,把历史粘性事件发送给新订阅方法
if (subscriberMethod.sticky) {
// eventInheritance为true,表示订阅subscriberMethod子类消息的订阅者也接收粘性事件
if (eventInheritance) {
// 消息类型所有子类的已存在粘性事件,都需要受到关注
// 有非常多粘性事件时进行事件遍历是低效的,数据结构需在遍历上变得更高效
// 例如:使用额外的map存储父类的子类:Class -> List<Class>
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
// 候选事件类型
Class<?> candidateEventType = entry.getKey();
// 检查eventType是否为candidateEventType的父类或同类
if (eventType.isAssignableFrom(candidateEventType)) {
// 把子类事件发送给注册为父类事件的订阅者
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
// 仅发送类型完全匹配的订阅事件
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}
4.3 checkPostStickyEventToSubscription
投递粘性事件给订阅者。如果订阅者尝试终止该事件会失败,因为事件没法通过投递状态进行跟踪。
1
2
3
4
5
6
7
private void checkPostStickyEventToSubscription(Subscription newSubscription, Object stickyEvent) {
// 检查粘性事件是否为null
if (stickyEvent != null) {
// 把粘性事件作为普通事件发送给订阅者
postToSubscription(newSubscription, stickyEvent, isMainThread());
}
}
4.4 postToSubscription
ThreadMode 中几种类别的主要含义在后续文章详解
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
// 获取订阅者方法指定线程的类别
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
// 处于主线程就直接触发订阅者
invokeSubscriber(subscription, event);
} else {
// 处在其他线程,向主线程Handler发送消息
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
// 优先放入主线程的消息队列等待处理
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
// 当前线程是主线程,把事件放入后台线程处理队列
backgroundPoster.enqueue(subscription, event);
} else {
// 现在是后台线程,直接调用订阅者方法
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
// 放入异步队列处理
asyncPoster.enqueue(subscription, event);
break;
default:
// 传入未知threadMode
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
上述注册流程简化为如下流程图:
五、注销订阅
5.1 unregister
把指定订阅者从消息总线中注销
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public synchronized void unregister(Object subscriber) {
// 订阅者订阅的事件类型
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
if (subscribedTypes != null) {
// 逐个注销订阅者订阅的事件
for (Class<?> eventType : subscribedTypes) {
unsubscribeByEventType(subscriber, eventType);
}
// 从typesBySubscriber移除subscriber
typesBySubscriber.remove(subscriber);
} else {
// 此订阅者之前未曾注册过,却进行注销操作
logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
}
}
5.2 unsubscribeByEventType
只更新 subscriptionsByEventType,而不是 typesBySubscriber,因为 typesBySubscriber 由调用者更新。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
// 根据事件类型获取订阅记录
List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions != null) {
// 订阅记录数量
int size = subscriptions.size();
for (int i = 0; i < size; i++) {
// 查找并移除所有相关订阅记录
Subscription subscription = subscriptions.get(i);
if (subscription.subscriber == subscriber) {
// subscription设置为不活动
subscription.active = false;
// 从列表中移除
subscriptions.remove(i);
i--;
size--;
}
}
}
}
上述注销流程简化为如下流程图:
下一章将继续介绍 EventBus 初始化过程中,EventBusBuilder 的作用。