Android源码系列(17) -- QueuedWork

November 5, 2018

一、类签名

这是个内部工具类,用于跟踪那些未完成的或尚未结束的全局任务,新任务通过方法 queue 加入。添加 finisher 的runnables,由 waitToFinish 方法保证执行,用于保证任务已被处理完成。

这个类用于 SharedPreference 编辑后修改异步写入到磁盘,所以设计一个在 Activity.onPause 或类似地方等待写入操作机制,而这个机制也能用于其他功能。所有排队的异步任务都在一个独立、专用的线程上处理。

1
public class QueuedWork

二、常量

可延迟 Runnable 的延迟值。该值设置得尽可能大,但也需要足够小,令延迟难以察觉。

1
private static final long DELAY = 100;

waitToFinish() 运行超过 MAX_WAIT_TIME_MILLIS 毫秒,发出警告

1
private static final long MAX_WAIT_TIME_MILLIS = 512;

本类使用的锁对象

1
private static final Object sLock = new Object();

三、静态变量

用此锁约束任何时候都只有一个线程在处理一个任务,意味着执行顺序就是任务添加的顺序。

因为任务执行过程中会一直持有 sProcessingWork 锁,所以 sProcessingWork 独立于 sLock 避免阻塞整个类的运转

1
private static Object sProcessingWork = new Object();

存放 Finisher 的链表。通过 added 方法添加,或 removed 方法移除

1
2
@GuardedBy("sLock")
private static final LinkedList<Runnable> sFinishers = new LinkedList<>();

QueuedWork 内部的静态 HandlerThread

1
2
@GuardedBy("sLock")
private static Handler sHandler = null;

存放通过 queue 方法添加任务的队列

1
2
@GuardedBy("sLock")
private static final LinkedList<Runnable> sWork = new LinkedList<>();

指示新任务是否能被延迟,默认为 true

1
2
@GuardedBy("sLock")
private static boolean sCanDelay = true;

任务执行前等待的时间

1
2
3
4
@GuardedBy("sLock")
private final static ExponentiallyBucketedHistogram
        mWaitTimes = new ExponentiallyBucketedHistogram(
        16);

正在等待处理的任务数

1
private static int mNumWaits = 0;

四、静态方法

4.1 getHandler()

HandlerThread 实现细节请参考 Android源码系列(11) – HandlerThread。该文章已经提到 HandlerThread 本质是线程,启动 HandlerThread 即启动子线程,并让任务在该线程上执行。

1
2
3
4
5
6
7
8
9
10
11
12
private static Handler getHandler() {
    synchronized (sLock) {
        if (sHandler == null) {
            HandlerThread handlerThread = new HandlerThread("queued-work-looper",
                    Process.THREAD_PRIORITY_FOREGROUND);
            handlerThread.start();

            sHandler = new QueuedWorkHandler(handlerThread.getLooper());
        }
        return sHandler;
    }
}

4.2 addFinisher(Runnable finisher)

添加 finisher-runnable 等待 queue() 异步处理任务,由 SharedPreferences$Editor.startCommit() 使用

应注意的是,这个方法不会引起 finisher 的执行。这只是调用者的集合,让调用者异步跟踪任务的最新状态。在大多数情况下,调用者(如:SharedPreferences) 会在 add() 后很快又调用 remove()。这些任务实际在 waitToFinish() 执行

1
2
3
4
5
public static void addFinisher(Runnable finisher) {
    synchronized (sLock) {
        sFinishers.add(finisher);
    }
}

4.3 removeFinisher(Runnable finisher)

移除一个已经添加的 finisher-runnable

1
2
3
4
5
public static void removeFinisher(Runnable finisher) {
    synchronized (sLock) {
        sFinishers.remove(finisher);
    }
}

4.4 waitToFinish()

触发排队任务马上执行。排队任务在独立的线程异步执行。任务和 finishers 都在这个线程上运行,finishers 通过这种方式实现检查排队任务是否完成。

Activity.onPause()BroadcastReceiver.onReceive()之后处理Service.command之后 都会调用此方法,所以异步任务永远不会丢失

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public static void waitToFinish() {
    long startTime = System.currentTimeMillis();
    boolean hadMessages = false;

    Handler handler = getHandler();

    synchronized (sLock) {
        if (handler.hasMessages(QueuedWorkHandler.MSG_RUN)) {
            // 延迟任务会在processPendingWork()中处理
            handler.removeMessages(QueuedWorkHandler.MSG_RUN);
        }

        // 不要延迟任何任务,因为这样可能会推迟finishers的执行
        sCanDelay = false;
    }

    StrictMode.ThreadPolicy oldPolicy = StrictMode.allowThreadDiskWrites();
    try {
        // 所有等待任务运行完毕
        processPendingWork();
    } finally {
        StrictMode.setThreadPolicy(oldPolicy);
    }

    try {
        while (true) {
            Runnable finisher;
            
            // 从sFinishers获取一个Finisher
            synchronized (sLock) {
                finisher = sFinishers.poll();
            }

            if (finisher == null) {
                break;
            }
            
            // 执行Finisher
            finisher.run();
        }
    } finally {
        // 所有finisher运行完毕
        sCanDelay = true;
    }

    // 统计任务执行的总时间和递增的等待次数
    synchronized (sLock) {
        long waitTime = System.currentTimeMillis() - startTime;

        if (waitTime > 0 || hadMessages) {
            mWaitTimes.add(Long.valueOf(waitTime).intValue());
            mNumWaits++;
        }
    }
}

ActivityThread 类内共有五段源码调用 QueuedWork.waitToFinish();

位置一:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private void handleServiceArgs(ServiceArgsData data) {
    Service s = mServices.get(data.token);
    if (s != null) {
        try {
            if (data.args != null) {
                data.args.setExtrasClassLoader(s.getClassLoader());
                data.args.prepareToEnterProcess();
            }
            int res;
            if (!data.taskRemoved) {
                res = s.onStartCommand(data.args, data.flags, data.startId);
            } else {
                s.onTaskRemoved(data.args);
                res = Service.START_TASK_REMOVED_COMPLETE;
            }

            QueuedWork.waitToFinish();

            try {
                ActivityManager.getService().serviceDoneExecuting(
                        data.token, SERVICE_DONE_EXECUTING_START, data.startId, res);
            } catch (RemoteException e) {
                throw e.rethrowFromSystemServer();
            }
            ensureJitEnabled();
        } catch (Exception e) {
            if (!mInstrumentation.onException(s, e)) {
                throw new RuntimeException(
                        "Unable to start service " + s
                        + " with " + data.args + ": " + e.toString(), e);
            }
        }
    }
}

位置二:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private void handleStopService(IBinder token) {
    Service s = mServices.remove(token);
    if (s != null) {
        try {
            if (localLOGV) Slog.v(TAG, "Destroying service " + s);
            s.onDestroy();
            s.detachAndCleanUp();
            Context context = s.getBaseContext();
            if (context instanceof ContextImpl) {
                final String who = s.getClassName();
                ((ContextImpl) context).scheduleFinalCleanup(who, "Service");
            }

            QueuedWork.waitToFinish();

            try {
                ActivityManager.getService().serviceDoneExecuting(
                        token, SERVICE_DONE_EXECUTING_STOP, 0, 0);
            } catch (RemoteException e) {
                throw e.rethrowFromSystemServer();
            }
        } catch (Exception e) {
            if (!mInstrumentation.onException(s, e)) {
                throw new RuntimeException(
                        "Unable to stop service " + s
                        + ": " + e.toString(), e);
            }
            Slog.i(TAG, "handleStopService: exception for " + token, e);
        }
    } else {
        Slog.i(TAG, "handleStopService: token=" + token + " not found.");
    }
    //Slog.i(TAG, "Running services: " + mServices);
}

位置三:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Override
public void handlePauseActivity(IBinder token, boolean finished, boolean userLeaving,
        int configChanges, PendingTransactionActions pendingActions, String reason) {
    ActivityClientRecord r = mActivities.get(token);
    if (r != null) {
        if (userLeaving) {
            performUserLeavingActivity(r);
        }

        r.activity.mConfigChangeFlags |= configChanges;
        performPauseActivity(r, finished, reason, pendingActions);

        // Make sure any pending writes are now committed.
        if (r.isPreHoneycomb()) {
            QueuedWork.waitToFinish();
        }
        mSomeActivitiesChanged = true;
    }
}

位置四:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Override
public void handleStopActivity(IBinder token, boolean show, int configChanges,
        PendingTransactionActions pendingActions, boolean finalStateRequest, String reason) {
    final ActivityClientRecord r = mActivities.get(token);
    r.activity.mConfigChangeFlags |= configChanges;

    final StopInfo stopInfo = new StopInfo();
    performStopActivityInner(r, stopInfo, show, true /* saveState */, finalStateRequest,
            reason);

    if (localLOGV) Slog.v(
        TAG, "Finishing stop of " + r + ": show=" + show
        + " win=" + r.window);

    updateVisibility(r, show);

    // Make sure any pending writes are now committed.
    if (!r.isPreHoneycomb()) {
        QueuedWork.waitToFinish();
    }

    stopInfo.setActivity(r);
    stopInfo.setState(r.state);
    stopInfo.setPersistentState(r.persistentState);
    pendingActions.setStopInfo(stopInfo);
    mSomeActivitiesChanged = true;
}

位置五:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// TODO: This method should be changed to use {@link #performStopActivityInner} to perform to
// stop operation on the activity to reduce code duplication and the chance of fixing a bug in
// one place and missing the other.
private void handleSleeping(IBinder token, boolean sleeping) {
    ActivityClientRecord r = mActivities.get(token);

    if (r == null) {
        Log.w(TAG, "handleSleeping: no activity for token " + token);
        return;
    }

    if (sleeping) {
        if (!r.stopped && !r.isPreHoneycomb()) {
            callActivityOnStop(r, true /* saveState */, "sleeping");
        }

        // Make sure any pending writes are now committed.
        if (!r.isPreHoneycomb()) {
            QueuedWork.waitToFinish();
        }

        // Tell activity manager we slept.
        try {
            ActivityManager.getService().activitySlept(r.token);
        } catch (RemoteException ex) {
            throw ex.rethrowFromSystemServer();
        }
    } else {
        if (r.stopped && r.activity.mVisibleFromServer) {
            r.activity.performRestart(true /* start */, "handleSleeping");
            r.setState(ON_START);
        }
    }
}

4.5 queue(Runnable work, boolean shouldDelay)

安排任务异步执行。work 是新加入的任务,shouldDelay 指明本任务是否能延迟处理。

SharedPreferences.Editor.appy() 源码间接调用此方法,调用时 shouldDelaytrue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static void queue(Runnable work, boolean shouldDelay) {
    Handler handler = getHandler();

    synchronized (sLock) {
        // 任务放入sWork列表中
        sWork.add(work);

        // 然后放入一个Message作为通知
        if (shouldDelay && sCanDelay) {
            // sCanDelay默认为true,支持延迟100ms
            handler.sendEmptyMessageDelayed(QueuedWorkHandler.MSG_RUN, DELAY);
        } else {
            // 放入一个马上触发运行的消息
            handler.sendEmptyMessage(QueuedWorkHandler.MSG_RUN);
        }
    }
}

4.6 hasPendingWork()

当队列存在等待的异步任务时返回 true

1
2
3
4
5
public static boolean hasPendingWork() {
    synchronized (sLock) {
        return !sWork.isEmpty();
    }
}

4.7 processPendingWork()

从任务队列中取任务执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private static void processPendingWork() {
    // 此处使用sProcessingWork锁,保证sWork
    synchronized (sProcessingWork) {
        LinkedList<Runnable> work;

        synchronized (sLock) {
            // 克隆sWork为work实例
            work = (LinkedList<Runnable>) sWork.clone();
            // 清除sWork
            sWork.clear();

            // 利用QueuedWorkHandler.MSG_RUN移除相关Messages
            // 避免以后收到类似通知,而任务却已经完成执行
            getHandler().removeMessages(QueuedWorkHandler.MSG_RUN);
        }
        
        // 根据work任务排列顺序依次执行
        if (work.size() > 0) {
            for (Runnable w : work) {
                w.run();
            }
        }
    }
}

五、QueuedWorkHandler

收到 msg.what == MSG_RUN 消息通知就去调用processPendingWork()方法。该 HandlerLooper 调用, Looper 又在子线程,所以 Handler 在子线程执行任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private static class QueuedWorkHandler extends Handler {
    static final int MSG_RUN = 1;

    QueuedWorkHandler(Looper looper) {
        super(looper);
    }

    public void handleMessage(Message msg) {
        // 收到Message
        if (msg.what == MSG_RUN) {
            // 触发方法调用,把sWork内所有任务批量送去执行
            processPendingWork();
        }
    }
}