转载

RxJS 核心概念之Subject

什么是Subject?在RxJS中,Subject是一类特殊的Observable,它可以向多个Observer多路推送数值。普通的Observable并不具备多路推送的能力(每一个Observer都有自己独立的执行环境),而Subject可以共享一个执行环境。

Subject是一种可以多路推送的可观察对象。与EventEmitter类似,Subject维护着自己的Observer。

每一个Subject都是一个Observable(可观察对象)对于一个Subject,你可以订阅( subscribe )它,Observer会和往常一样接收到数据。从Observer的视角看,它并不能区分自己的执行环境是普通Observable的单路推送还是基于Subject的多路推送。

Subject的内部实现中,并不会在被订阅( subscribe )后创建新的执行环境。它仅仅会把新的Observer注册在由它本身维护的Observer列表中,这和其他语言、库中的 addListener 机制类似。

每一个Subject也可以作为Observer(观察者)Subject同样也是一个由 next(v)error(e) ,和 complete() 这些方法组成的对象。调用 next(theValue) 方法后,Subject会向所有已经在其上注册的Observer多路推送 theValue

下面的例子中,我们在Subject上注册了两个Observer,并且多路推送了一些数值:

var subject = new Rx.Subject();  subject.subscribe({   next: (v) => console.log('observerA: ' + v) }); subject.subscribe({   next: (v) => console.log('observerB: ' + v) });  subject.next(1); subject.next(2);

控制台输出结果如下:

 observerA: 1 observerB: 1 observerA: 2 observerB: 2

既然Subject是一个Observer,你可以把它作为 subscribe (订阅)普通Observable时的参数,如下面例子所示:

var subject = new Rx.Subject();  subject.subscribe({   next: (v) => console.log('observerA: ' + v) }); subject.subscribe({   next: (v) => console.log('observerB: ' + v) });  var observable = Rx.Observable.from([1, 2, 3]);  observable.subscribe(subject); // 你可以传递Subject来订阅observable

执行后结果如下:

 observerA: 1 observerB: 1 observerA: 2 observerB: 2 observerA: 3 observerB: 3

通过上面的实现:我们发现可以通过Subject将普通的Observable单路推送转换为多路推送。这说明了Subject的作用——作为单路Observable转变为多路Observable的桥梁。

还有几种特殊的 Subject 类型,分别是 BehaviorSubjectReplaySubject ,和 AsyncSubject

多路推送的Observable

在以后的语境中,每当提到“多路推送的Observable”,我们特指通过Subject构建的Observable执行环境。否则“普通的Observable”只是一个不会共享执行环境并且被订阅后才生效的一系列值。

通过使用Subject可以创建拥有相同执行环境的多路的Observable。

下面展示了 多路 的运作方式:Subject从普通的Observable订阅了数据,然后其他Observer又订阅了这个Subject,示例如下:

var source = Rx.Observable.from([1, 2, 3]); var subject = new Rx.Subject(); var multicasted = source.multicast(subject);  // 通过`subject.subscribe({...})`订阅Subject的Observer: multicasted.subscribe({   next: (v) => console.log('observerA: ' + v) }); multicasted.subscribe({   next: (v) => console.log('observerB: ' + v) });  // 让Subject从数据源订阅开始生效: multicasted.connect();

multicast 方法返回一个类似于Observable的可观察对象,但是在其被订阅后,它会表现Subject的特性。 multicast 返回的对象同时是 ConnectableObservable 类型的,拥有 connect() 方法。

connect() 方法非常的重要,它决定Observable何时开始执行。由于调用 connect() 后,Observable开始执行,因此, connect() 会返回一个 Subscription 供调用者来终止执行。

引用计数

通过手动调用 connect() 返回的Subscription控制执行十分繁杂。通常,我们希望在有第一个Observer订阅Subject后 自动 connnect ,当所有Observer都取消订阅后终止这个Subject。

我们来分析一下下面例子中subscription的过程:

  1. 第一个Observer 订阅了多路推送的 Observable

  2. 多路Observable被连接

  3. 向第一个Observer发送 值为 0next 通知

  4. 第二个Observer订阅了多路推送的 Observable

  5. 向第一个Observer发送 值为 1next 通知

  6. 向第二个Observer发送 值为 1next 通知

  7. 第一个Observer取消了对多路推送的Observable的订阅

  8. 向第二个Observer发送 值为 2next 通知

  9. 第二个Observer取消了对多路推送的Observable的订阅

  10. 取消对多路推送的Observable的连接

通过显式地调用 connect() ,代码如下:

var source = Rx.Observable.interval(500); var subject = new Rx.Subject(); var multicasted = source.multicast(subject); var subscription1, subscription2, subscriptionConnect;  subscription1 = multicasted.subscribe({   next: (v) => console.log('observerA: ' + v) }); subscriptionConnect = multicasted.connect();  setTimeout(() => {   subscription2 = multicasted.subscribe({     next: (v) => console.log('observerB: ' + v)   }); }, 600);  setTimeout(() => {   subscription1.unsubscribe(); }, 1200);  setTimeout(() => {   subscription2.unsubscribe();   subscriptionConnect.unsubscribe();  }, 2000);

如果你不想显式地调用 connect() 方法,可以在ConnectableObservable类型的Observable上调用 refCount() 方法。方法会进行引用计数:记录Observable被订阅的行为。当订阅数从 01refCount() 会调用 connect() 方法。到订阅数从 10 ,他会终止整个执行过程。

refCount 使得多路推送的Observable在被订阅后自动执行,在所有观察者取消订阅后,停止执行。

下面是示例:

var source = Rx.Observable.interval(500); var subject = new Rx.Subject(); var refCounted = source.multicast(subject).refCount(); var subscription1, subscription2, subscriptionConnect;  console.log('observerA subscribed'); subscription1 = refCounted.subscribe({   next: (v) => console.log('observerA: ' + v) });  setTimeout(() => {   console.log('observerB subscribed');   subscription2 = refCounted.subscribe({     next: (v) => console.log('observerB: ' + v)   }); }, 600);  setTimeout(() => {   console.log('observerA unsubscribed');   subscription1.unsubscribe(); }, 1200);  setTimeout(() => {   console.log('observerB unsubscribed');   subscription2.unsubscribe(); }, 2000);

执行输出结果如下:

 observerA subscribed observerA: 0 observerB subscribed observerA: 1 observerB: 1 observerA unsubscribed observerB: 2 observerB unsubscribed

只有ConnectableObservables拥有 refCount() 方法,调用后会返回一个 Observable 而不是新的ConnectableObservable。

BehaviorSubject

BehaviorSubject 是Subject的一个衍生类,具有“最新的值”的概念。它总是保存最近向数据消费者发送的值,当一个Observer订阅后,它会即刻从 BehaviorSubject 收到“最新的值”。

BehaviorSubjects非常适于表示“随时间推移的值”。举一个形象的例子,Subject表示一个人的生日,而Behavior则表示一个人的岁数。(生日只是一天,一个人的岁数会保持到下一次生日之前。)

下面例子中,展示了如何用 0 初始化BehaviorSubject,当Observer订阅它时, 0 是第一个被推送的值。紧接着,在第二个Observer订阅BehaviorSubject之前,它推送了 2 ,虽然订阅在推送 2 之后,但是第二个Observer仍然能接受到 2

var subject = new Rx.BehaviorSubject(0 /* 初始值 */);  subject.subscribe({   next: (v) => console.log('observerA: ' + v) });  subject.next(1); subject.next(2);  subject.subscribe({   next: (v) => console.log('observerB: ' + v) });  subject.next(3);

输出结果如下:

 observerA: 0 observerA: 1 observerA: 2 observerB: 2 observerA: 3 observerB: 3

ReplaySubject

ReplaySubject 如同于 BehaviorSubjectSubject 的子类。通过 ReplaySubject 可以向新的订阅者推送旧数值,就像一个录像机 ReplaySubject 可以记录Observable的一部分状态(过去时间内推送的值)。

.一个 ReplaySubject 可以记录Observable执行过程中推送的多个值,并向新的订阅者回放它们。

你可以指定回放值的数量:

var subject = new Rx.ReplaySubject(3 /* 回放数量 */);  subject.subscribe({   next: (v) => console.log('observerA: ' + v) });  subject.next(1); subject.next(2); subject.next(3); subject.next(4);  subject.subscribe({   next: (v) => console.log('observerB: ' + v) });  subject.next(5);

输出如下:

 observerA: 1 observerA: 2 observerA: 3 observerA: 4 observerB: 2 observerB: 3 observerB: 4 observerA: 5 observerB: 5

除了回放数量,你也可以以毫秒为单位去指定“窗口时间”,决定ReplaySubject记录多久以前Observable推送的数值。下面的例子中,我们把回放数量设置为 100 ,把窗口时间设置为 500 毫秒:

var subject = new Rx.ReplaySubject(100, 500 /* windowTime */);  subject.subscribe({   next: (v) => console.log('observerA: ' + v) });  var i = 1; setInterval(() => subject.next(i++), 200);  setTimeout(() => {   subject.subscribe({     next: (v) => console.log('observerB: ' + v)   }); }, 1000);

第二个Observer接受到 3 (600ms), 4 (800ms) 和 5 (1000ms),这些值均在订阅之前的 500 毫秒内推送(窗口长度 1000ms – 600ms = 400ms < 500ms):

 observerA: 1 observerA: 2 observerA: 3 observerA: 4 observerA: 5 observerB: 3 observerB: 4 observerB: 5 observerA: 6 observerB: 6 ...

AsyncSubject

AsyncSubject是Subject的另外一个衍生类,Observable仅会在执行完成后,推送执行环境中的最后一个值。

var subject = new Rx.AsyncSubject();  subject.subscribe({   next: (v) => console.log('observerA: ' + v) });  subject.next(1); subject.next(2); subject.next(3); subject.next(4);  subject.subscribe({   next: (v) => console.log('observerB: ' + v) });  subject.next(5); subject.complete();

输出结果如下:

 observerA: 5 observerB: 5

AsyncSubject 与 last() 操作符相似,等待完成通知后推送执行过程的最后一个值。

原文  http://hao.jser.com/archive/10345/
正文到此结束
Loading...