遇到某个值时暂停流
Pause stream when it encounters a certain value
我有一个流,每当遇到某个值时我想暂停/停留。最优雅的方法是什么?
假设幻数是 4 - 这将阻止在 x 时间内发出任何其他值。它还会阻止 'stale' 值在暂停期间发出,但在暂停后,它将切换到最新值
Source +-1-2-3-4-5-6-7-8-9
Pause +----x
Output +-1-2-3-4----67-8-9
您可以使用 CombineLatest()
将源与另一个处理暂停状态的可观察对象结合起来。合并后使用简单的 Where()
过滤当前暂停的条目。请参阅以下源代码:
ISubject<bool> pause = new BehaviorSubject<bool>(false);
IObservable<long> original = Observable.Interval(TimeSpan.FromSeconds(1));
IObservable<long> controlled = original.CombineLatest(pause, (o, p) => {
return new { o, p};
})
.Where(it => !it.p)
.Select(it => it.o);
Stopwatch sw = Stopwatch.StartNew();
controlled.Subscribe(it => {
Console.WriteLine($"{sw.ElapsedMilliseconds} - {it}");
});
Thread.Sleep((int)(3.1 * 1000)); // wait for 3.1 seconds
pause.OnNext(true); // pause it
Thread.Sleep((int)(3.5 * 1000)); // wait for 3.5 seconds
pause.OnNext(false); // resume
Thread.Sleep(5 * 1000); // and finally wait for 5 seconds
这将生成以下输出:
1017 - 0
2009 - 1
3007 - 2
6608 - 5
7007 - 6
8007 - 7
9007 - 8
10007 - 9
11007 - 10
如您所见,值 3
、4
和 5
最初是 blocked/ignored,但是当恢复暂停标志时最后一个值 5
是 "resend" 一旦暂停标志被重置(6.6 秒后)。之后发出正常间隔值。
我有一个流,每当遇到某个值时我想暂停/停留。最优雅的方法是什么?
假设幻数是 4 - 这将阻止在 x 时间内发出任何其他值。它还会阻止 'stale' 值在暂停期间发出,但在暂停后,它将切换到最新值
Source +-1-2-3-4-5-6-7-8-9
Pause +----x
Output +-1-2-3-4----67-8-9
您可以使用 CombineLatest()
将源与另一个处理暂停状态的可观察对象结合起来。合并后使用简单的 Where()
过滤当前暂停的条目。请参阅以下源代码:
ISubject<bool> pause = new BehaviorSubject<bool>(false);
IObservable<long> original = Observable.Interval(TimeSpan.FromSeconds(1));
IObservable<long> controlled = original.CombineLatest(pause, (o, p) => {
return new { o, p};
})
.Where(it => !it.p)
.Select(it => it.o);
Stopwatch sw = Stopwatch.StartNew();
controlled.Subscribe(it => {
Console.WriteLine($"{sw.ElapsedMilliseconds} - {it}");
});
Thread.Sleep((int)(3.1 * 1000)); // wait for 3.1 seconds
pause.OnNext(true); // pause it
Thread.Sleep((int)(3.5 * 1000)); // wait for 3.5 seconds
pause.OnNext(false); // resume
Thread.Sleep(5 * 1000); // and finally wait for 5 seconds
这将生成以下输出:
1017 - 0
2009 - 1
3007 - 2
6608 - 5
7007 - 6
8007 - 7
9007 - 8
10007 - 9
11007 - 10
如您所见,值 3
、4
和 5
最初是 blocked/ignored,但是当恢复暂停标志时最后一个值 5
是 "resend" 一旦暂停标志被重置(6.6 秒后)。之后发出正常间隔值。