现在你应该了解 Rx 的概念了,是时候开始创建和操作事件流了。操作事件流的原始实现是基于 C# 的 LINQ ,而 LINQ 是受到 functional programming 启发的。如果你了解 LINQ 更容易理解本节内容, 如果不了解也没关系。我们将从最简单的内容开始介绍。 大部分的 Rx 操作函数(operators )用来操作已经存在的事件流。在介绍操作函数之前,先来看看如何创建一个 Observable。
在第一部分示例中,我们使用 Subject 来手工的把数据推送给他并创建一个事件流。我们使用这种方式来示例一些核心的概念和Rx 中一些核心的函数 subscribe。大部分时候使用 Subject 都不是创建 Observable 的最佳方式。 下面将介绍如何创建 Observable 来发射事件流。
Observable 有很多工厂方法可以创建一个事件流。
just 函数创建一个发射预定义好的数据的 Observable ,发射完这些数据后,事件流就结束了。
Observable<String> values = Observable.just("one", "two", "three"); Subscriptionsubscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") );
结果:
Received: one Received: two Received: three Completed
这个函数创建的 Observable 只发射一个 onCompleted 事件就结束了。
Observable<String> values = Observable.empty(); Subscriptionsubscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") );
结果:
Completed
这个 Observable 将不会发射任何事件和数据。
Observable<String> values = Observable.never(); Subscriptionsubscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") );
上面的代码不会打印任何东西。但是这个代码并没有阻塞住,实际上上面的代码立刻就执行完了。
这个 Observable 将会发射一个 error 事件,然后结束。
Observable<String> values = Observable.error(new Exception("Oops")); Subscriptionsubscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") );
结果:
Error: java.lang.Exception: Oops
defer 并没有定义一个新的 Observable, defer 只是用来声明当 Subscriber 订阅到一个 Observable 上时,该 Observable 应该如何创建。例如,如果我们想创建一个发射当前时间然后就结束的 Observable, 发射一个数据然后结束,看起来用 just 实现即可:
Observable<Long> now = Observable.just(System.currentTimeMillis()); now.subscribe(System.out::println); Thread.sleep(1000); now.subscribe(System.out::println);
结果:
1431443908375 1431443908375
注意上面两个 subscriber 相隔 1秒订阅这个 Observable,但是他们收到的时间数据是一样的!这是因为当订阅的时候,时间数据只调用一次。其实你希望的是,当 一个 subscriber 订阅的时候才去获取当前的时间。 defer 的参数是一个返回一个 Observable 对象的函数。该函数返回的 Observable 对象就是 defer 返回的 Observable 对象。 重点是,每当一个新的 Subscriber 订阅的时候,这个函数就重新执行一次。
Observable<Long> now = Observable.defer(() -> Observable.just(System.currentTimeMillis())); now.subscribe(System.out::println); Thread.sleep(1000); now.subscribe(System.out::println);
结果:
1431444107854 1431444108858
create 是非常强大的一个函数。可以创建任何你需要的 Observable。
static <T> Observable<T> create(Observable.OnSubscribe<T> f)
上面是 create 函数的定义,参数 Observable.OnSubscribe
Observable<String> values = Observable.create(o -> { o.onNext("Hello"); o.onCompleted(); }); Subscriptionsubscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") );
结果:
Received: Hello Completed
当有 Subscriber 订阅到这个 Observable 时(上面示例中的 values ),这个 Subscriber 对象就是你实现的函数中的参数 Subscriber。然后你可以在你的代码中把数据发射到这个 subscriber 中。注意,当数据发射完后,你需要手工的调用 onCompleted 来表明发射完成了。
如果之前的所有方法都不满足你的要求时,这个函数应当作为你创建自定义 Observable 的最佳方式。其实现方式和 第一部分我们通过 Subject 来发射事件类似,但是有几点非常重要的区别。首先:数据源被封装起来了,并和不相关的代码隔离开了。其次:Subject 有一些不太明显的问题,通过使用 Subject 你自己在管理状态,并且任何访问该 Subject 对象的人都可以往里面发送数据然后改变事件流。
还一个主要的区别是执行代码的时机,使用 create 创建的 Observable,当 Observable 创建的时候,你的函数还没有执行,只有当有 Subscriber 订阅的时候才执行。这就意味着每次当有 Subscriber 订阅的时候,该函数就执行一次。和 defer 的功能类似。结果和 ReplaySubject 类似, ReplaySubject 会缓存结果 当有新的 Subscriber 订阅的时候,把缓存的结果在发射给新的 Subscriber。如果要使用 ReplaySubject 来实现和 create 类似的功能,如果 create 中创建数据的函数是阻塞的话,则 ReplaySubject 在创建的时候线程会阻塞住知道 创建函数执行完。如果不想阻塞当前线程的话,则需要手工创建一个线程来初始化数据。其实 Rx 有更加优雅的方式解决这个问题。
其实使用 Observable.create 可以实现 前面几个工厂方法的功能。比如 上面的 create 函数的功能和 Observable.just(“hello”) 的功能是一样的。
在 functional programming(函数式编程)中,创建一系列数字是非常常见的。 RxJava 也提供了一些工厂方法来创建这样的序列。
做过函数式编码的程序员都了解这个函数的意思。 该函数发射一个整数序列:
Observable<Integer> values = Observable.range(10, 15);
上面示例将生成一个从 10 到 24 的数字序列(从 10 开始,发射 15个数字)。
创建一个无限的计时序列,每隔一段时间发射一个数字,从 0 开始:
Observable<Long> values = Observable.interval(1000, TimeUnit.MILLISECONDS); Subscriptionsubscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") ); System.in.read();
结果:
Received: 0 Received: 1 Received: 2 Received: 3 ...
如果我们不调用 unsubscribe 的话,这个序列是不会停止的。
上面的代码在最后有个 System.in.read(); 阻塞语句,这个语句是有必要的,不然的话,程序不会打印任何内容就退出了。原因是我们的操作不是阻塞的:我们创建了一个每隔一段时间就发射数据的 Observable,然后我们注册了一个 Subscriber 来打印收到的数据。这两个操作都是非阻塞的,而 发射数据的计时器是运行在另外一个线程的,但是这个线程不会阻止 JVM 结束当前的程序,所以 如果没有 System.in.read(); 这个阻塞操作,还没发射数据则程序就已经结束运行了。
Observable.timer 有两个重载函数。第一个示例创建了一个 Observable, 该 Observable 等待一段时间,然后发射数据 0 ,然后就结束了。
Observable<Long> values = Observable.timer(1, TimeUnit.SECONDS); Subscriptionsubscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") );
结果:
Received: 0 Completed
另外一个示例是,先等待一段时间,然后开始按照间隔的时间一直发射数据:
Observable<Long> values = Observable.timer(2, 1, TimeUnit.SECONDS); Subscriptionsubscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") );
结果:
Received: 0 Received: 1 Received: 2 ...
上面的示例,先等待 2秒,然后每隔一秒开始发射数据。
已经有很多工具来处理序列数据、集合和异步事件了,但是这些工具可能无法直接在 Rx 中使用。下面来介绍几个方法可以把这些工具产生的结果转换到你的 Rx 代码中。
如果你在使用类似 JavaFX 中的异步事件处理,则可以使用 Observable.create 把异步事件转换到 Observable 中:
Observable<ActionEvent> events = Observable.create(o -> { button2.setOnAction(new EventHandler<ActionEvent>() { @Override public void handle(ActionEvent e) { o.onNext(e) } }); })
根据事件的不同,事件的类型(上面的示例中事件类型为 ActionEvent)可能适合作为 Observable 发射数据的类型。 如果你需要的是该事件类型的属性(比如事件发射的位置),则获取该属性并把获取到的结果发射给最终的 Subscriber ,Subscriber 接受到结果可能和事件发生时的数据不一样。为了避免这种问题,在 Observable 中传递的数据应该保持其状态不变。
在 Java 并发框架中经常使用 Future 来获取异步结果。 通过使用 from 可以把 Future 的结果发射到 Observable 中:
FutureTask<Integer> f = new FutureTask<Integer>(() -> { Thread.sleep(2000); return 21; }); new Thread(f).start(); Observable<Integer> values = Observable.from(f); Subscriptionsubscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") );
结果:
Received: 21 Completed
当 FutureTask 执行完后, Observable 发射 Future 获取到的结果然后结束。如果任务 取消了,则 Observable 会发射一个 java.util.concurrent.CancellationException 错误信息。
你还可以对 Future 设置超时时间:
Observable<Integer> values = Observable.from(f, 1000, TimeUnit.MILLISECONDS);
当过了超时时间后, Future 还是没有返回结果, Observable 可以忽略其结果并发射一个 TimeoutException。
Observable.from 还有重载的函数可以用一个数据集合或者一个可以遍历的iterable 的数据来生成一个 Observable。逐个发射集合中的数据,最后发射一个 onCompleted 事件。
Integer[] is = {1,2,3}; Observable<Integer> values = Observable.from(is); Subscriptionsubscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") );
结果:
Received: 1 Received: 2 Received: 3 Completed
Observable 和 Iterable 或者 Stream 是不能通用的。Observables 是基于 push 模型的,而 Iterable 是基于 pull 模型的。