转载

RxJava 线程模型

本文基于 RxJava 2.1.2 。根据代码和输出日志会更容易理解。

RxJava 的线程模型如下:

1. 不指定线程的情况

  • 不指定线程也就是不使用 observeOnsubscribeOn ,所有操作在调用 subscribe 的线程执行。
@Test
public void noThread() {
    buildObservable().subscribe();
}

上面代码的输出为:

Thread[main]   execute   Action start emmit
Thread[main]   execute   Operation-1, event: 1
Thread[main]   execute   Operation-2, event: 1

2. subscribeOn

  • subscribeOn 不管调用多少次,只以第一次为准。如果只使用了 subscribeOn 、没有使用 observeOn ,则所有操作在第一次调用生成的线程里执行。
@Test
public void subscribeOn() throws InterruptedException {
    CountDownLatch latch = new CountDownLatch(1);

    Observable<Integer> observable = buildObservable();
    observable
        .subscribeOn(scheduler("subscribeOn-1"))
        .subscribeOn(scheduler("subscribeOn-2"))
        .subscribe(i -> {
            showMessageWithThreadName("Action subscribe");
            latch.countDown();
        });

    latch.await();
}

上面代码的输出为:

create scheduler subscribeOn-2
create scheduler subscribeOn-1
Thread[subscribeOn-1]   execute   Action start emmit
Thread[subscribeOn-1]   execute   Operation-1, event: 1
Thread[subscribeOn-1]   execute   Operation-2, event: 1
Thread[subscribeOn-1]   execute   Action subscribe

3. observeOn

  • observeOn 必须跟 subscribeOn 一起使用,单独使用会抛出空引用异常。
  • observeOn 应在 subscribeOn 的后面调用,否则会出现死锁的情况。
  • observeOn 操作会更改后续操作的执行线程,直至下一个 observeOn 调用之前的操作或 subscribe 操作。
@Test
public void observeOn() throws InterruptedException {
    CountDownLatch latch = new CountDownLatch(1);

    Observable<Integer> observable = buildObservable();
    observable
        .subscribeOn(scheduler("subscribeOn-1"))
        .observeOn(scheduler("observeOn-1"))
        .doOnNext(i -> {
            showMessageWithThreadName("Operation-3, event: " + i);
        })
        .observeOn(scheduler("observeOn-2"))
        .subscribe(i -> {
            showMessageWithThreadName("subscribe  " + i);
            latch.countDown();
        });

    latch.await();
}

上面代码的输出为:

create scheduler subscribeOn-1
Thread[subscribeOn-1]   execute   Action start emmit
Thread[subscribeOn-1]   execute   Operation-1, event: 1
Thread[subscribeOn-1]   execute   Operation-2, event: 1
create scheduler observeOn-1
Thread[observeOn-1]   execute   Operation-3, event: 1
create scheduler observeOn-2
Thread[observeOn-2]   execute   subscribe  1

4. 辅助代码

// 返回用给定线程名 命名的Scheduler
private Scheduler scheduler(String name) {
    return Schedulers.from(Executors.newSingleThreadExecutor(new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            System.out.println("create scheduler " + name);
            Thread t = new Thread(r, name);
            return t;
        }
    }));
}

// 输出当前线程名和给的消息
private void showMessageWithThreadName(String msg) {
    Thread t = Thread.currentThread();
    System.out.printf("%-10s   execute   %s/n", "Thread[" + t.getName() + "]", msg);
}

// 构建一个带有两个中间操作的 Observable
private Observable<Integer> buildObservable() {
    return Observable.fromPublisher((Subscriber<? super Integer> s) -> {
        showMessageWithThreadName("Action start emmit");
        // 消息源
        s.onNext(1);
        s.onComplete();
    })
    .doOnNext(i -> {
        showMessageWithThreadName("Operation-1, event: " + i);
    })
    .doOnNext(i -> {
        showMessageWithThreadName("Operation-2, event: " + i);
    });
}
原文  https://coderbee.net/index.php/framework/20180411/1625
正文到此结束
Loading...