将 IObservable<T> 拆分为 IObservable<IObservable<T>>
Split IObservable<T> into IObservable<IObservable<T>>
我有一个源流,它有时会发出一个特定的标记值来指定新流的开始。我想将我的流转换为 IObservable<IObservable<T>>
。谁能想到一个优雅的方式?
这应该可以解决问题:
observable = observable
.Publish()
.RefCount();
var splitted = observable
.Window(observable.Where(x => x == SENTINEL))
.Select(c => c.Where(x => x != SENTINEL));
完整示例:
const int SENTINEL = -1;
var observable = Observable
.Interval(TimeSpan.FromMilliseconds(100))
.Select(x => x + 1)
.Take(12)
.Select(x => x % 5 == 0 ? SENTINEL : x) // Every fifth is a sentinel
.Publish()
.RefCount();
observable
.Window(observable.Where(x => x == SENTINEL))
.Select(c => c.Where(x => x != SENTINEL))
.Select((c, i) => c.Select(x => (i, x))) // Embed the index of the subsequence
.Merge() // Merge them again
.Do(x => Console.WriteLine($"Received: {x}"))
.Subscribe();
await observable.LastOrDefaultAsync(); // Wait it to end
输出:
Received: (0, 1)
Received: (0, 2)
Received: (0, 3)
Received: (0, 4)
Received: (1, 6)
Received: (1, 7)
Received: (1, 8)
Received: (1, 9)
Received: (2, 11)
Received: (2, 12)
我有一个源流,它有时会发出一个特定的标记值来指定新流的开始。我想将我的流转换为 IObservable<IObservable<T>>
。谁能想到一个优雅的方式?
这应该可以解决问题:
observable = observable
.Publish()
.RefCount();
var splitted = observable
.Window(observable.Where(x => x == SENTINEL))
.Select(c => c.Where(x => x != SENTINEL));
完整示例:
const int SENTINEL = -1;
var observable = Observable
.Interval(TimeSpan.FromMilliseconds(100))
.Select(x => x + 1)
.Take(12)
.Select(x => x % 5 == 0 ? SENTINEL : x) // Every fifth is a sentinel
.Publish()
.RefCount();
observable
.Window(observable.Where(x => x == SENTINEL))
.Select(c => c.Where(x => x != SENTINEL))
.Select((c, i) => c.Select(x => (i, x))) // Embed the index of the subsequence
.Merge() // Merge them again
.Do(x => Console.WriteLine($"Received: {x}"))
.Subscribe();
await observable.LastOrDefaultAsync(); // Wait it to end
输出:
Received: (0, 1)
Received: (0, 2)
Received: (0, 3)
Received: (0, 4)
Received: (1, 6)
Received: (1, 7)
Received: (1, 8)
Received: (1, 9)
Received: (2, 11)
Received: (2, 12)