本文基于 RxJava 2.1.2 。根据代码和输出日志会更容易理解。
observeOn
和 subscribeOn
,所有操作在调用 subscribe
的线程执行。 @Test public void noThread() { buildObservable().subscribe(); }
上面代码的输出为:
Thread[main] execute Action start emmit Thread[main] execute Operation-1, event: 1 Thread[main] execute Operation-2, event: 1
subscribeOn
不管调用多少次,只以第一次为准。如果只使用了 subscribeOn
、没有使用 observeOn
,则所有操作在第一次调用生成的线程里执行。 @Test public void subscribeOn() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); Observable<Integer> observable = buildObservable(); observable .subscribeOn(scheduler("subscribeOn-1")) .subscribeOn(scheduler("subscribeOn-2")) .subscribe(i -> { showMessageWithThreadName("Action subscribe"); latch.countDown(); }); latch.await(); }
上面代码的输出为:
create scheduler subscribeOn-2 create scheduler subscribeOn-1 Thread[subscribeOn-1] execute Action start emmit Thread[subscribeOn-1] execute Operation-1, event: 1 Thread[subscribeOn-1] execute Operation-2, event: 1 Thread[subscribeOn-1] execute Action subscribe
observeOn
必须跟 subscribeOn
一起使用,单独使用会抛出空引用异常。 observeOn
应在 subscribeOn
的后面调用,否则会出现死锁的情况。 observeOn
操作会更改后续操作的执行线程,直至下一个 observeOn
调用之前的操作或 subscribe
操作。 @Test public void observeOn() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); Observable<Integer> observable = buildObservable(); observable .subscribeOn(scheduler("subscribeOn-1")) .observeOn(scheduler("observeOn-1")) .doOnNext(i -> { showMessageWithThreadName("Operation-3, event: " + i); }) .observeOn(scheduler("observeOn-2")) .subscribe(i -> { showMessageWithThreadName("subscribe " + i); latch.countDown(); }); latch.await(); }
上面代码的输出为:
create scheduler subscribeOn-1 Thread[subscribeOn-1] execute Action start emmit Thread[subscribeOn-1] execute Operation-1, event: 1 Thread[subscribeOn-1] execute Operation-2, event: 1 create scheduler observeOn-1 Thread[observeOn-1] execute Operation-3, event: 1 create scheduler observeOn-2 Thread[observeOn-2] execute subscribe 1
// 返回用给定线程名 命名的Scheduler private Scheduler scheduler(String name) { return Schedulers.from(Executors.newSingleThreadExecutor(new ThreadFactory() { @Override public Thread newThread(Runnable r) { System.out.println("create scheduler " + name); Thread t = new Thread(r, name); return t; } })); } // 输出当前线程名和给的消息 private void showMessageWithThreadName(String msg) { Thread t = Thread.currentThread(); System.out.printf("%-10s execute %s/n", "Thread[" + t.getName() + "]", msg); } // 构建一个带有两个中间操作的 Observable private Observable<Integer> buildObservable() { return Observable.fromPublisher((Subscriber<? super Integer> s) -> { showMessageWithThreadName("Action start emmit"); // 消息源 s.onNext(1); s.onComplete(); }) .doOnNext(i -> { showMessageWithThreadName("Operation-1, event: " + i); }) .doOnNext(i -> { showMessageWithThreadName("Operation-2, event: " + i); }); }