RX 和缓冲
RX and buffering
我正在尝试获取以下可观察值(缓冲容量为 10 个刻度):
Time 0 5 10 15 20 25 30 35 40
|----|----|----|----|----|----|----|----|
Source A B C D E F G H
Result A E H
B F
C G
D
Phase |<------->|-------|<------->|<------->|
B I B B
也就是说,行为与 Buffer observable 非常相似,不同之处在于缓冲阶段不是在精确的时隙中,而是从空闲阶段推送的第一个符号开始。我的意思是,在上面的示例中,缓冲阶段以 'A'、'E' 和 'H' 符号开始。
有没有办法组合 Observable 还是我必须从头开始实现它?
任何帮助将不胜感激。
试试这个:
IObservable<T> source = ...;
IScheduler scheduler = ...;
IObservable<IList<T>> query = source
.Publish(obs => obs
.Buffer(() => obs.Take(1).IgnoreElements()
.Concat(Observable.Return(default(T)).Delay(duration, scheduler))
.Amb(obs.IgnoreElements())));
缓冲区关闭选择器在开始时调用一次,然后在缓冲区关闭时调用一次。选择器显示 "The buffer being started now should be closed duration
after the first element of this buffer, or when the source completes, whichever occurs first."
编辑:根据您的评论,如果您想要对 query
进行多个订阅,共享对 source
的单个订阅,您可以通过将 .Publish().RefCount()
附加到查询来实现.
IObservable<IList<T>> query = source
.Publish(obs => obs
.Buffer(() => obs.Take(1).IgnoreElements()
.Concat(Observable.Return(default(T)).Delay(duration, scheduler))
.Amb(obs.IgnoreElements())));
.Publish()
.RefCount();
我正在尝试获取以下可观察值(缓冲容量为 10 个刻度):
Time 0 5 10 15 20 25 30 35 40
|----|----|----|----|----|----|----|----|
Source A B C D E F G H
Result A E H
B F
C G
D
Phase |<------->|-------|<------->|<------->|
B I B B
也就是说,行为与 Buffer observable 非常相似,不同之处在于缓冲阶段不是在精确的时隙中,而是从空闲阶段推送的第一个符号开始。我的意思是,在上面的示例中,缓冲阶段以 'A'、'E' 和 'H' 符号开始。
有没有办法组合 Observable 还是我必须从头开始实现它?
任何帮助将不胜感激。
试试这个:
IObservable<T> source = ...;
IScheduler scheduler = ...;
IObservable<IList<T>> query = source
.Publish(obs => obs
.Buffer(() => obs.Take(1).IgnoreElements()
.Concat(Observable.Return(default(T)).Delay(duration, scheduler))
.Amb(obs.IgnoreElements())));
缓冲区关闭选择器在开始时调用一次,然后在缓冲区关闭时调用一次。选择器显示 "The buffer being started now should be closed duration
after the first element of this buffer, or when the source completes, whichever occurs first."
编辑:根据您的评论,如果您想要对 query
进行多个订阅,共享对 source
的单个订阅,您可以通过将 .Publish().RefCount()
附加到查询来实现.
IObservable<IList<T>> query = source
.Publish(obs => obs
.Buffer(() => obs.Take(1).IgnoreElements()
.Concat(Observable.Return(default(T)).Delay(duration, scheduler))
.Amb(obs.IgnoreElements())));
.Publish()
.RefCount();