文章列表:
- EventBus源码剖析(1) – 订阅注册与注销
- EventBus源码剖析(2) – EventBusBuilder
- EventBus源码剖析(3) – 线程模式
- EventBus源码剖析(4) – 订阅记录
- EventBus源码剖析(5) – Poster
一、Poster
这是所有 Poster 实现的共同接口,包含一个抽象方法。子类实现该抽象方法后,可接收到订阅记录 Subscription 和事件 Object。实现类需要根据自身特性,把事件按照既定模式发送给订阅者的接收方法。
1
2
3
4
5
6
interface Poster {
// 把需要发送给指定Subscription的事件加到队列中
// @param subscription 事件记录Subscription
// @param event 发送给订阅者的事件
void enqueue(Subscription subscription, Object event);
}
所有实现子类:
二、AsyncPoster
在后台异步投递事件,每个使用 AsyncPoster 的 Runnable 都有自己的线程,适合耗时但不占用处理器时间片的io操作,任务运行完毕后线程归还给线程池。
AsyncPoster 和 BackgroundPoster 共享同一个 EventBus 线程池,该线程池类型为 Executors.newCachedThreadPool()。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class AsyncPoster implements Runnable, Poster {
// 事件投递队列
private final PendingPostQueue queue;
private final EventBus eventBus;
AsyncPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}
// 向队列存入订阅记录和事件,并激活线程池进行事件派发
public void enqueue(Subscription subscription, Object event) {
// 创建PendingPost实例,存入订阅记录subscription和事件event
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
// PendingPost实例放入队列等待派发
queue.enqueue(pendingPost);
// 任务放入线程池等待执行
eventBus.getExecutorService().execute(this);
}
@Override
public void run() {
// 从事件投递队列获取一个任务执行
PendingPost pendingPost = queue.poll();
// pendingPost不能为空
if(pendingPost == null) {
throw new IllegalStateException("No pending post available");
}
// 把事件发送给订阅者
eventBus.invokeSubscriber(pendingPost);
}
}
三、BackgroundPoster
3.1 BackgroundPoster
BackgroundPoster 实现后台投递事件。BackgroundPoster 本身同时实现 Runnable 接口,这样就可以把类实例直接送到线程池中执行。线程池从事件队列获取需要派发的任务并执行。
BackgroundPoster 只有一个运行线程,按任务进入队列的顺序依次执行,适合大量短小的任务。如果队列没有任务,该 Runnable 会退出,线程也会归还给线程池。从前文介绍可知,线程池实现是 Executors.newCachedThreadPool()。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
final class BackgroundPoster implements Runnable, Poster {
private final PendingPostQueue queue;
private final EventBus eventBus;
// executor是否正在运行
private volatile boolean executorRunning;
BackgroundPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}
// 订阅记录和其订阅的事件进队
public void enqueue(Subscription subscription, Object event) {
// 从PendingPost缓存池中获取缓存对象,用于保存subscription和event
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
// 把设置完毕的PendingPost实例存入队列中
synchronized (this) {
queue.enqueue(pendingPost);
// 激活运行
if (!executorRunning) {
executorRunning = true;
// 实现了Runnable接口,所以可以直接在线程池中执行
eventBus.getExecutorService().execute(this);
}
}
}
// 把本类的实例放入线程池之后,由线程池调度执行
@Override
public void run() {
try {
try {
// 循环执行,直到完成PendingPostQueue队列里所有任务
while (true) {
// 从PendingPostQueue队列获取PendingPost实例
PendingPost pendingPost = queue.poll(1000);
// 获取超时会出现PendingPost实例为空
if (pendingPost == null) {
synchronized (this) {
// 加锁后再到队列获取PendingPost实例
pendingPost = queue.poll();
// 队列中已经没有任务,退出执行
if (pendingPost == null) {
executorRunning = false;
return;
}
}
}
// 在线程池的线程中执行事件派发
eventBus.invokeSubscriber(pendingPost);
}
} catch (InterruptedException e) {
// 本方法在线程池执行过程中被中断,捕获InterruptedException
eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);
}
} finally {
// 结束运行,把executorRunning设置为false
executorRunning = false;
}
}
}
3.2 PendingPost
再看看存放订阅信息和事件 PendingPost 类的内部构造。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
final class PendingPost {
// 所有PendingPost共享相同ArrayList<PendingPost>缓存池,最大容量为10000
private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();
// 将要处理的事件
Object event;
// 接收事件的订阅者信息
Subscription subscription;
// 指向下一个PendingPost实例的引用,在PendingPostQueue中使用
PendingPost next;
// 构造方法,可知构造新实例时next为null
private PendingPost(Object event, Subscription subscription) {
this.event = event;
this.subscription = subscription;
}
// 从缓存池中获取缓存实例,并把订阅信息和事件放入取得的实例中
static PendingPost obtainPendingPost(Subscription subscription, Object event) {
synchronized (pendingPostPool) {
int size = pendingPostPool.size();
// 缓存池非空
if (size > 0) {
// 从缓存池中获取最后一个实例
PendingPost pendingPost = pendingPostPool.remove(size - 1);
// 放入通知事件
pendingPost.event = event;
// 放入订阅信息
pendingPost.subscription = subscription;
// 初始化next为null
pendingPost.next = null;
return pendingPost;
}
}
// 缓存池没有已缓存的实例,直接创建新实例
return new PendingPost(event, subscription);
}
// PendingPost负载的事件已经发送给订阅方法,回收PendingPost到缓存池
static void releasePendingPost(PendingPost pendingPost) {
// 相关数据成员置空
pendingPost.event = null;
pendingPost.subscription = null;
pendingPost.next = null;
synchronized (pendingPostPool) {
// 限制缓存池最大容量为10000,避免缓存池无限扩容
if (pendingPostPool.size() < 10000) {
// 缓存对象放入缓存队列中
pendingPostPool.add(pendingPost);
}
}
}
}
3.3 PendingPostQueue
AsyncPoster 和 BackgroundPoster 拥有各自的 PendingPostQueue,用于存放所有待完成的任务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
final class PendingPostQueue {
private PendingPost head;
private PendingPost tail;
// 存入PendingPost实例
synchronized void enqueue(PendingPost pendingPost) {
// 不能向队列存入空任务
if (pendingPost == null) {
throw new NullPointerException("null cannot be enqueued");
}
// 队列已有其他任务,新任务放入队列尾部
if (tail != null) {
tail.next = pendingPost;
tail = pendingPost;
} else if (head == null) {
// 队列是空的,新任务直接进队
head = tail = pendingPost;
} else {
throw new IllegalStateException("Head present, but no tail");
}
notifyAll();
}
// 同步从队列中获取任任务
synchronized PendingPost poll() {
PendingPost pendingPost = head;
if (head != null) {
head = head.next;
if (head == null) {
tail = null;
}
}
return pendingPost;
}
// 等待maxMillisToWait毫秒或被notifyAll,再从队列取任务
synchronized PendingPost poll(int maxMillisToWait) throws InterruptedException {
if (head == null) {
wait(maxMillisToWait);
}
return poll();
}
}
四、HandlerPoster
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
public class HandlerPoster extends Handler implements Poster {
// 处理队列
private final PendingPostQueue queue;
// 执行消息的超时时间
private final int maxMillisInsideHandleMessage;
private final EventBus eventBus;
private boolean handlerActive;
// 构造方法
protected HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
super(looper);
this.eventBus = eventBus;
this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
queue = new PendingPostQueue();
}
// 存入PendingPost实例
public void enqueue(Subscription subscription, Object event) {
// 创建PendingPost实例,存入订阅记录和事件
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
// PendingPost实例放入队列等待派发
queue.enqueue(pendingPost);
// handler没在运行
if (!handlerActive) {
// 向Handler发送一个Message
handlerActive = true;
// 发送Message失败
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}
@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
long started = SystemClock.uptimeMillis();
while (true) {
// 从队列中获取任务
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
synchronized (this) {
// 在同步下再次检查
pendingPost = queue.poll();
// 队列中没有任务
if (pendingPost == null) {
handlerActive = false;
return;
}
}
}
// 已获得任务,把任务的事件发给对应订阅方法
eventBus.invokeSubscriber(pendingPost);
// 计算接收者方法处理事件耗费的时长
long timeInMethod = SystemClock.uptimeMillis() - started;
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
handlerActive = rescheduled;
}
}
}
五、MainThreadSupport
在主线程投递事件,在Android中是UI线程。如果在其他系统中使用,可以自行指定特定线程为”主线程”。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public interface MainThreadSupport {
boolean isMainThread();
Poster createPoster(EventBus eventBus);
// 内部类实现外部接口
class AndroidHandlerMainThreadSupport implements MainThreadSupport {
private final Looper looper;
public AndroidHandlerMainThreadSupport(Looper looper) {
this.looper = looper;
}
@Override
public boolean isMainThread() {
return looper == Looper.myLooper();
}
@Override
public Poster createPoster(EventBus eventBus) {
return new HandlerPoster(eventBus, looper, 10);
}
}
}