RX 和缓冲

RX and buffering

我正在尝试获取以下可观察值(缓冲容量为 10 个刻度):

Time    0    5    10   15   20   25   30   35   40
        |----|----|----|----|----|----|----|----|
Source  A  B C   D        E F  G    H

Result            A                 E         H
                  B                 F
                  C                 G
                  D

Phase   |<------->|-------|<------->|<------->|
             B        I        B         B

也就是说,行为与 Buffer observable 非常相似,不同之处在于缓冲阶段不是在精确的时隙中,而是从空闲阶段推送的第一个符号开始。我的意思是,在上面的示例中,缓冲阶段以 'A'、'E' 和 'H' 符号开始。

有没有办法组合 Observable 还是我必须从头开始实现它?

任何帮助将不胜感激。

试试这个:

IObservable<T> source = ...;
IScheduler scheduler = ...;
IObservable<IList<T>> query = source
    .Publish(obs => obs
        .Buffer(() => obs.Take(1).IgnoreElements()
            .Concat(Observable.Return(default(T)).Delay(duration, scheduler))
            .Amb(obs.IgnoreElements())));

缓冲区关闭选择器在开始时调用一次,然后在缓冲区关闭时调用一次。选择器显示 "The buffer being started now should be closed duration after the first element of this buffer, or when the source completes, whichever occurs first."

编辑:根据您的评论,如果您想要对 query 进行多个订阅,共享对 source 的单个订阅,您可以通过将 .Publish().RefCount() 附加到查询来实现.

IObservable<IList<T>> query = source
    .Publish(obs => obs
        .Buffer(() => obs.Take(1).IgnoreElements()
            .Concat(Observable.Return(default(T)).Delay(duration, scheduler))
            .Amb(obs.IgnoreElements())));
    .Publish()
    .RefCount();