如何将一个 RXJs 流用于 2 个不同的事物

How to use one RXJs stream for 2 different things

我的应用程序中有 2 个服务。一个通过网络加载 youTube 评论线程和评论的 youTube 服务,以及一个管理评论批次加载的评论服务。评论服务特定于我的应用程序,YouTube 服务与应用程序无关。

我有一个函数 getCommentThreadsForChannel 可以加载评论线程。这个的主要实现是在 youTube 服务中,但是这是在评论服务上调用的,它基本上只是调用 returns 从 youTube 服务中观察到的。

就调用它的我的控制器而言,这只是一个可观察的注释线程序列。但是,在我的 commentService 中,我想在本地存储这些线程。每当我获得 100 个以上的线程时,我想将其分批存储所有线程,这样我就不会为每个新数据位处理列表。我想出了这个代码:

    getCommentThreadsForChannel(): Rx.Observable<ICommentThread> {
        var threadStream: Rx.Observable<ICommentThread> =
            this.youTubeService.getCommentThreadsForChannel();

        threadStream
            .bufferWithCount(100)
            .scan( ( allItems, currentItem ) => {
                currentItem.forEach(thread => {
                    allItems.push(thread);
                });

                console.log( `Save items to local storage: ${allItems.length}` )

                return allItems;
            }, []  );

        return threadStream;
    }

我认为这里用于批处理线程和将所有线程累积到一个数组中的逻辑很好,但从未调用过此代码。我想这是因为我根本没有订阅这个帖子。

我不想在这里订阅,因为那样会订阅基础流,然后我会有 2 个订阅,所有数据都会加载两次(有很多数据 - 加载大约需要一分钟一次调用 100 个线程的 30 多个调用)。

我在这里基本上想要一个do,它不会影响传递给控制器​​的流,但我想使用 RXjs 的缓冲和累积逻辑。

我想我需要以某种方式共享或发布流,但我以前使用这些运算符收效甚微,并且看不出如果不添加第二个订阅我将如何做到这一点。

如何在不订阅两次的情况下共享一个流并以两种不同的方式使用它?我可以创建某种被动流,它只在基于订阅的可观察对象订阅时订阅吗?

我相信您要查找的是 share and replay, thankfully RX has the shareReplay(bufferSize).

的组合
var threadStream: Rx.Observable<ICommentThread> = this.youTubeService
                .getCommentThreadsForChannel()
                .shareReplay(1); 

getCommentThreadsForChannel(): Rx.Observable<ICommentThread> {
        threadStream
            .bufferWithCount(100)
            .scan( ( allItems, currentItem ) => {
                currentItem.forEach(thread => {
                    allItems.push(thread);
                });

                console.log( `Save items to local storage: ${allItems.length}` )

                return allItems;
            }, []  );

        return threadStream;
    }

使用多播运营商共享将确保在第一个之后的每个附加订阅不会发出不必要的请求,重播是为了确保所有未来的订阅都收到最后一个通知。

我最终解决了这个问题(在@MonkeyMagiic 的帮助下)。

我必须共享流,以便我可以对数据做 2 件不同的事情,缓冲它并分别在每个值上。问题是这两个流都必须订阅,但我不想在服务中订阅——这应该在控制器中完成。

解决方案是再次合并 2 个流并忽略缓冲区中的值:

var intervalStream = Rx.Observable.interval(250)
    .take(8)
    .do( function(value){console.log( "source: " + value );} )
    .shareReplay(1);

var bufferStream = intervalStream.bufferWithCount(3)
    .do( function(values){
      console.log( "do something with buffered values: " + values );
    } )
    .flatMap( function(values){ return Rx.Observable.empty(); } );

var mergeStream = intervalStream.merge( bufferStream );

mergeStream.subscribe(
  function( value ){ console.log( "value received by controller: " + value ); },
  function( error ){ console.log( "error: " + error ); },
  function(){ console.log( "onComplete" ); }
);

输出:

"source: 0"
"value received by controller: 0"
"source: 1"
"value received by controller: 1"
"source: 2"
"value received by controller: 2"
"do something with buffered values: 0,1,2"
"source: 3"
"value received by controller: 3"
"source: 4"
"value received by controller: 4"
"source: 5"
"value received by controller: 5"
"do something with buffered values: 3,4,5"
"source: 6"
"value received by controller: 6"
"source: 7"
"value received by controller: 7"
"do something with buffered values: 6,7"
"onComplete"

JSBin