来自 EventEmitter 的热共享 Observable

Hot and shared Observable from an EventEmitter

有没有办法从 EventEmitter(或 Angular 2 alpha 46 / RxJS 5 alpha 中可用的等效项)获得热可观察值?即如果我们在值解析后订阅,它会触发先前解析的值。类似于我们总是 return 相同的承诺。

理想情况下,只使用 Angular 2 个对象(我在某处读到一个轻量的 RxJS 稍后会被嵌入以消除依赖),否则导入 RxJS 就可以了。 AsyncSubject 似乎符合我的需要,但它在 RxJS 5 alpha 中不可用。

我尝试了以下方法,但没有成功(从不触发)。知道如何使用它吗?

let emitter = new EventEmitter<MyObj>();
setTimeout(() => {emitter.next(new MyObj());});
this.observable = emitter;
return this.observable.share();

Full plunker here comparing hot and cold

Usecase:仅访问一些异步对象一次(例如 new EventEmitter[ 中的一系列 HTTP 调用 merged/wrapped =46=]),但将解析的异步对象提供给任何订阅它的 service/component,即使他们在解析后订阅(收到 HTTP 响应)也是如此。

编辑:问题不在于如何合并 HTTP 响应,而在于如何从 EventEmitter 或 Angular 2 alpha 46 / RxJS 可用的任何等效项中获取(热门?)可观察对象允许在异步结果后订阅的 5 alpha retrieved/resolved(HTTP 只是异步来源的一个示例)。 myEventEmitter.share() 不起作用(参见上面的插件),尽管它可以与由 HTTP 编辑的 Observable return 一起使用(参见 plunker from @Eric Martinez ).从 Angular 2 alpha 46 开始,.toRx() 方法不再存在,EventEmitter 是可观察对象和主题本身。

只要我们始终 return 相同的承诺对象,这就可以很好地处理承诺。由于我们在 HTTP Angular 2 服务中引入了观察者,因此我想避免混淆承诺和观察者(据说观察者比承诺更强大,所以它应该允许做一些容易用承诺做的事情)。

Specs about share()(我还没有找到第 5 版 alpha 的文档 - Angular 2 使用的版本)- 正在 Observable return 编辑 [= =57=] 2 HTTP 服务,不适用于 EventEmitter。

编辑:阐明了为什么不使用由 HTTP 编辑的 Observable return 并补充说不直接使用 RxJS 会更好。

编辑:更改描述:关注的是多个订阅,而不是合并 HTTP 结果。

谢谢!

您似乎描述的功能不是冷可观察的功能,而是 Rx.BehaviourSubject 的功能。在这里查看有关 Rxjs 主题的解释:https://github.com/Reactive-Extensions/RxJS/blob/master/doc/gettingstarted/subjects.md

我从那里引用:

BehaviourSubject is similar to ReplaySubject, except that it only stored the last value it published. BehaviourSubject also requires a default value upon initialization. This value is sent to observers when no other value has been received by the subject yet. This means that all subscribers will receive a value instantly on subscribe, unless the Subject has already completed.

Rx.AsyncSubject 的行为最接近承诺:

AsyncSubject is similar to the Replay and Behavior subjects, however it will only store the last value, and only publish it when the sequence is completed. You can use the AsyncSubject type for situations when the source observable is hot and might complete before any observer can subscribe to it. In this case, AsyncSubject can still provide the last value and publish it to any future subscribers.

另外两条评论:

  • 在你的 plunker 中:this._coldObservable = emitter.share();。使用 share returns 一个热门的可观察对象!
  • EventEmitter 实际上首先扩展了 subject

更新:Rx.Observable:

周围包装 EventEmitter
function toRx ( eventEmitter ) {
  return Rx.Observable.create(function ( observer ) {
    eventEmitter.subscribe(function listener ( value ) {observer.onNext(value)});
    // Ideally you also manage error and completion, if that makes sense with Angular2
    return function () {
      /* manage end of subscription here */
    };
  };
)
}

一旦你有了Rx.Observable,你就可以申请share()shareReplay(1),任何你想要的。

我敢打赌 Angular 团队迟早会提出桥接功能,但如果您不想等待,可以自己做。

ReplaySubject 正在做我正在寻找的事情。 @robwormald 提供了一个关于 gitter 的工作示例,我稍作修改以更好地演示。

公开 HTTP 响应:

import {Injectable} from 'angular2/angular2';
import {Http} from 'angular2/http';
import {ReplaySubject} from '@reactivex/rxjs/dist/cjs/Rx'

@Injectable()
export class PeopleService {
  constructor(http:Http) {
    this.people = new ReplaySubject(1);

    http.get('api/people.json')
      .map(res => res.json())
      .subscribe(this.people);
  }
}

多次订阅:

// ... annotations
export class App {
  constructor(peopleService:PeopleService) {

    people.subscribe(v => {
      console.log(v)
    });

    //some time later

    setTimeout(() => {
      people.subscribe(v => {
        console.log(v)
      });
      people.subscribe(v => {
        console.log(v)
      });
    },2000)
  }
}

Full plunker

编辑:BehaviorSubject 是另一种选择。在这个用例中,不同之处在于初始值,例如,如果我们想在使用 HTTP 响应更新之前显示缓存中的内容。