。
本文是 Piasy 独立翻译,发表于 http://blog.piasy.com/AdvancedRxJava ,请阅读原文支持原创 http://blog.piasy.com/AdvancedRxJava/2016/08/19/schedulers-2/原文 Schedulers (part 2)
在上文中,我介绍了如何利用 RxJava 已有的类来实现自定义的 scheduler。
在本文中,我将更加深入一层,演示如何操控底层的 ExecutorService
以及 RxJava 其他的基础设施,并与之进行交互,而这些都是无法通过 NewThreadWorker
实现的。
ScheduledAction
类 在 Scheduler
/ Worker
诞生之前的日子里,和 Future
类进行交互是非常直观的:我们只需要用一个 Subscription
类包装 cancel()
就可以了,这样它就能被加入到各种 subscription 容器类中了。
然而引入了 Scheduler
/ Worker
之后就不能这样了,因为我们需要记录这些任务,并且取消它们。一旦 Future
被记录了,那就需要在它们完成或者被取消时取消记录,否则就会发生内存泄漏。这一要求就意味着我们不能直接把一个 Action0
/ Runnable
提交到 ExecutorService
上,我们需要包装一下它们,让它们可以在完成或者被取消时被取消记录。
解决方案就是 ScheduledAction
类。所有常规的 Action0
都会在 NewThreadWorker.scheduleActual()
中被包装为 ScheduledAction
。它的内部包含了一个 SubscriptionList
,用来容纳那些在这个任务完成或者取消时要执行的操作:
public final class ScheduledAction implements Runnable, Subscription { final Action0 action; // (1) final SubscriptionList slist; // (2) public ScheduledAction(Action0 action) { this.action = action; this.slist = new SubscriptionList(); } @Override public void run() { try { action.call(); // (3) } finally { unsubscribe(); // (4) } } @Override public boolean isUnsubscribed() { return slist.isUnsubscribed(); } @Override public void unsubscribe() { slist.unsubscribe(); } public void add(Subscription s) { // (5) slist.add(s); } }
这个类还是非常直观的:
SubscriptionList
就足够了。 ExecutorService
接收的是 Runnable
,所以我们实现 Runnable
接口,并在 run()
函数中执行实际的任务。 unsubscribe()
函数,用来触发执行清理任务。 ScheduledAction
中,所以我们暴露出 SubscriptionList
的 add()
函数。 接下来就是要在任务被提交到 ExecutorService
之前把所有的记录以及清理任务都串起来。为了简单起见,我们假设我们的 ExecutorService 是个单线程的 service。我们将在后面处理多线程的情况。首先让我们看一下自定义 Worker
的结构:
public final class CustomWorker extends Scheduler.Worker { final ExecutorService exec; // (1) final CompositeSubscription tracking; // (2) final boolean shutdown; // (3) public CustomWorker() { exec = Executors.newSingleThreadExecutor(); tracking = new CompositeSubscription(); shutdown = true; } public CustomWorker(ExecutorService exec) { this.exec = exec; tracking = new CompositeSubscription(); shutdown = false; // (4) } @Override public Subscription schedule(Action0 action) { return schedule(action, 0, null); // (5) } @Override public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) { // implement } @Override public boolean isUnsubscribed() { return tracking.isUnsubscribed(); // (6) } @Override public void unsubscribe() { if (shutdown) { exec.shutdownNow(); // (7) } tracking.unsubscribe(); } }
目前为止,这个结构还不复杂:
tracking
成员还记录被提交的任务是否已经被取消。 ExecutorService
是我们自己创建的,那我们就将它关闭,然后取消订阅所有提交的任务(注意,如果 service 是我们自己创建的,那其实我们不需要记录提交过来的任务,因为 service 已经记录了,我们可以在 shutdownNow()
中取消订阅它们)。 最后,我们看一下延迟 schedule()
的实现:
// ... @Override public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) { if (isUnsubscribed()) { // (1) return Subscriptions.unsubscribed(); } ScheduledAction sa = new ScheduledAction(action); // (2) tracking.add(sa); // (3) sa.add(Subscriptions.create( () -> tracking.remove(sa))); Future<?> f; if (delayTime <= 0) { // (4) f = exec.submit(sa); } else if (exec instanceof ScheduledExecutorService) { // (5) f = ((ScheduledExecutorService)exec) .schedule(sa, delayTime, unit); } else { f = genericScheduler.schedule(() -> { // (6) Future<?> g = exec.submit(sa); sa.add(Subscriptions.create( // (7) () -> g.cancel(false))); }, delayTime, unit); } sa.add(Subscriptions.create( // (8) () -> f.cancel(false))); return sa; // (9) } // ...
本文第一段复杂的代码工作机制如下:
RejectedExecutionException
。你可以把函数中后面的代码都用一个 try-catch
包裹起来,并在异常发生时返回同样的表示已经取消的常量 subscription。 ScheduledAction
。 tracking
中,并且增加一个取消订阅的回调,以便在它执行完毕或者被取消时可以将其从 tracking
中移除。注意,由于幂等性, remove()
不会调用 ScheduledAction
的 unsubscribe()
,从而不会导致死循环。 Future
。 ExecutorService
是 ScheduledExecutorService
,我们就可以直接调用它的 schedule()
函数了。 ScheduledExecutorService
来实现延迟调度了,但我们不能直接把任务调度给它,因为这样它会在错误的线程中执行。我们需要创建一个中间任务,它将在延迟结束之后,向正确的线程池调度一个即时的任务。 Future
能在 unsubscribe()
调用时被取消。这里我们把内部的 Future
加入到了 ScheduledAction
中。 Future
加入到 ScheduledAction
中( 通过把 Future#cancel()
包装到一个 Subscription
中 )。在这里,你就可以控制是否需要强行( 中断 )取消了。(RxJava 会根据取消订阅时所处的线程来决定:如果取消订阅就是在执行任务的线程中,那就没必要中断了) ScheduledAction
也是任务发起方用来取消订阅的凭证(token)。 由于 subscription 容器类的终结状态特性,即便(7)和(8)发生在 ScheduledAction
被取消订阅之后,它们也会立即被取消订阅。至于更加激进的需求,你可以在 ScheduledAction#run()
中在执行实际任务之前检查是否已经被取消订阅。
最后缺失的一点代码就是 genericScheduler
了。你可以为 worker 添加一个 static final
成员,并像下面这样设置:
// ... static final ScheduledExecutorService genericScheduler; static { genericScheduler = Executors.newScheduledThreadPool(1, r -> { Thread t = new Thread(r, "GenericScheduler"); t.setDaemon(true); return t; }); } // ...
在本文中,我演示了如何把一个任务包装为一个可以被调度的任务,并且如何让它们可以在单个任务层面以及整个 worker 层面被取消。
在本系列的最后一篇文章中,我将讲解如何处理多线程的 ExecutorService
,因为我们不能让非延迟调度的任务执行时乱序甚至并发执行。
欢迎大家关注我的微信公众号,会推送最新 blog、以及短篇内容。