RxJava提供了Reactive Programming for Java,个人在Android开发中用的相当多,尤其线程切换和链式的数据处理,给码农们提供了极大的便利。在线程切换方面,之前一直用subscribeOn和observeOn配合,不过最近工作需要对线程更加细致的考量,比如zipWith的线程执行由谁决定,于是再重新编码理一遍。
package com.opticalix.theme.zepp; import com.opticalix.base.BaseRunner; import com.opticalix.theme.synchronize.PrefixThreadFactory; import com.opticalix.util.Logger; import rx.Observable; import rx.Scheduler; import rx.Subscriber; import rx.functions.Action0; import rx.functions.Func1; import rx.functions.Func2; import rx.schedulers.Schedulers; import java.util.concurrent.Executors; public class RxRunner implements BaseRunner { private final Scheduler mSchedulerIO; private final Scheduler mSchedulerIO2; private final Scheduler mSchedulerDisplay; private final Scheduler mSchedulerCompute; private final Scheduler mSchedulerCompute2; public RxRunner() { mSchedulerDisplay = Schedulers.from(Executors.newSingleThreadExecutor(new PrefixThreadFactory("DISPLAY"))); PrefixThreadFactory ioFactory = new PrefixThreadFactory("IO"); mSchedulerIO = Schedulers.from(Executors.newSingleThreadExecutor(ioFactory)); mSchedulerIO2 = Schedulers.from(Executors.newSingleThreadExecutor(ioFactory)); PrefixThreadFactory computeFactory = new PrefixThreadFactory("COMPUTE"); mSchedulerCompute = Schedulers.from(Executors.newSingleThreadExecutor(computeFactory)); mSchedulerCompute2 = Schedulers.from(Executors.newSingleThreadExecutor(computeFactory)); } @Override public void run(String[] args) { Observable<Integer> src1 = getIntegerObservable(1); Observable<Integer> src2 = getIntegerObservable(2).subscribeOn(mSchedulerIO2); src1 .map(new Func1<Integer, String>() { @Override public String call(Integer integer) { printThreadInfo("map1"); return "map-1, num=" + integer; } }) .map(new Func1<String, String>() { @Override public String call(String s) { printThreadInfo("map2"); return "map-2, s=" + s; } }) .zipWith(src2, new Func2<String, Integer, String>() { @Override public String call(String s, Integer integer) { printThreadInfo("zipWith"); return "zipWith s=" + s + ", num=" + integer; } }) //第一个subscribeOn指定整个流程的主要IO操作线程 .subscribeOn(mSchedulerIO) .doOnSubscribe(new Action0() { @Override public void call() { printThreadInfo("doOnSubscribe"); } }) //不能替代第一个subscribeOn,但是它的作用在于指定doOnSubscribe操作的线程 .subscribeOn(mSchedulerCompute) .doOnSubscribe(new Action0() { @Override public void call() { printThreadInfo("doOnSubscribe"); } }) .subscribeOn(mSchedulerCompute2) //指定最终subscriber的线程 .observeOn(mSchedulerDisplay) .subscribe(new Subscriber<String>() { @Override public void onStart() { //onStart线程不能指定,始终在subscribe的调用线程 printThreadInfo("onStart"); } @Override public void onCompleted() { printThreadInfo("onCompleted"); System.exit(0); } @Override public void onError(Throwable throwable) { throwable.printStackTrace(); } @Override public void onNext(String s) { printThreadInfo("onNext, s=" + s); } }); } private Observable<Integer> getIntegerObservable(int num) { return Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onStart(); subscriber.onNext(num); subscriber.onCompleted(); printThreadInfo("create, num=" + num); } }); } private void printThreadInfo(String tag) { String name = Thread.currentThread().getName(); Logger.p("threadName=[%s], tag=%s", name, tag); } }
其中ThreadFactory类如下:
package com.opticalix.theme.synchronize; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; public class PrefixThreadFactory implements ThreadFactory { private final AtomicInteger threadNumber = new AtomicInteger(1); private final String prefix; public PrefixThreadFactory(String prefix) { this.prefix = prefix + "-"; } @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, this.prefix + threadNumber.getAndIncrement()); return thread; } }
剩下的BaseRunner和Logger只是项目工具类,可以忽略。
执行结果如下:
threadName=[main], tag=onStart threadName=[COMPUTE-1], tag=doOnSubscribe threadName=[COMPUTE-2], tag=doOnSubscribe threadName=[IO-1], tag=map1 threadName=[IO-1], tag=map2 threadName=[IO-1], tag=create, num=1 threadName=[IO-2], tag=zipWith threadName=[DISPLAY-1], tag=onNext, s=zipWith s=map-2, s=map-1, num=1, num=2 threadName=[IO-2], tag=create, num=2 threadName=[DISPLAY-1], tag=onCompleted
由于多个observeOn需要小心控制线程控制范围,所以个人倾向于demo code所示的用法,在链最后使用subscribeOn & observeOn。
小心多个创建的情形(每个observable都要指定subscribeOn)。
如果使用doOnSubscribe,它的线程取决于其后第一个subscribeOn,但在这之前要调用subscribeOn指定一个线程作为其他数据处理的主要线程。