将 Observable 拆分为 windows
Splitting Observable into windows
我正在寻找 Observable.Window 类似运算符的 window 选择器
例如,我们可以取一个自然数序列。
我想知道如何将此序列拆分为 windows,以便在数字大于 4 或 window 大小已达到 3
时每个新的 window 开始
输入序列是IObservable<int>
输出序列为IObservable<IObservable<int>>
序列 1 2 5 3 1 1 2 3 1 5 0
将产生 windows 1 2; 5 3 1; 1 2 3; 1; 5 0
使用 C# 这可行:
var observable =
source
.Concat(Observable.Return(-1))
.Publish(sp =>
sp.Zip(sp.Skip(1), (x0, x1) => new { x0, x1 })
.Publish(zsp =>
zsp
.Window(zsp.Where(x => x.x1 >= 4))
.Select(xs => xs.Select(x => x.x0).Window(3))
.Merge()));
我得到这个结果:
根据我对问题的评论,我不确定序列 1, 2, 3, 5
是否应该产生 windows 1 2 3 | 5
或 1 2 3 | | 5
(请参阅空 window 这就是 Enigmativity 的答案所产生的)。我的回答不会产生空 window:
public static IObservable<IObservable<T>> Window<T>(this IObservable<T> source, Func<T, bool> predicate, int maximumWindowSize)
{
return Observable.Create<IObservable<T>>(obs =>
{
var currentWindow = new Subject<T>();
obs.OnNext(currentWindow);
var count = 0;
return source.Subscribe(x =>
{
if (count == maximumWindowSize || predicate(x))
{
count = 0;
currentWindow.OnCompleted();
currentWindow = new Subject<T>();
obs.OnNext(currentWindow);
}
currentWindow.OnNext(x);
count++;
}, obs.OnError, () =>
{
obs.OnCompleted();
currentWindow.OnCompleted();
});
});
}
可以这样使用:
var windows = source.Window(x => x > 4, 3);
我正在寻找 Observable.Window 类似运算符的 window 选择器
例如,我们可以取一个自然数序列。 我想知道如何将此序列拆分为 windows,以便在数字大于 4 或 window 大小已达到 3
时每个新的 window 开始输入序列是IObservable<int>
输出序列为IObservable<IObservable<int>>
序列 1 2 5 3 1 1 2 3 1 5 0 将产生 windows 1 2; 5 3 1; 1 2 3; 1; 5 0
使用 C# 这可行:
var observable =
source
.Concat(Observable.Return(-1))
.Publish(sp =>
sp.Zip(sp.Skip(1), (x0, x1) => new { x0, x1 })
.Publish(zsp =>
zsp
.Window(zsp.Where(x => x.x1 >= 4))
.Select(xs => xs.Select(x => x.x0).Window(3))
.Merge()));
我得到这个结果:
根据我对问题的评论,我不确定序列 1, 2, 3, 5
是否应该产生 windows 1 2 3 | 5
或 1 2 3 | | 5
(请参阅空 window 这就是 Enigmativity 的答案所产生的)。我的回答不会产生空 window:
public static IObservable<IObservable<T>> Window<T>(this IObservable<T> source, Func<T, bool> predicate, int maximumWindowSize)
{
return Observable.Create<IObservable<T>>(obs =>
{
var currentWindow = new Subject<T>();
obs.OnNext(currentWindow);
var count = 0;
return source.Subscribe(x =>
{
if (count == maximumWindowSize || predicate(x))
{
count = 0;
currentWindow.OnCompleted();
currentWindow = new Subject<T>();
obs.OnNext(currentWindow);
}
currentWindow.OnNext(x);
count++;
}, obs.OnError, () =>
{
obs.OnCompleted();
currentWindow.OnCompleted();
});
});
}
可以这样使用:
var windows = source.Window(x => x > 4, 3);