触发第一个事件后几秒钟的 RX 缓冲区事件
RX buffer events for several seconds after first event is triggered
我有一个文件观察器,我可以从中观察创建和更改的事件。
我希望当第一个事件被触发(创建或更改)时,它需要开始缓冲 10 秒,在这 10 秒之后我想处理缓冲的事件。
我已经得到的是:
Observable.FromEventPattern<FileSystemEventArgs>(FileSystemWatcher, "Created")
.Merge(Observable.FromEventPattern<FileSystemEventArgs>(FileSystemWatcher, "Changed"))
.Buffer(TimeSpan.FromSeconds(10))
.Subscribe(list =>
{
Debug.WriteLine("Do something");
});
此代码每 10 秒执行 'Debug.WriteLine("Do something");'。
编辑:
好的,让我尝试用时间线来解释它。
- 文件观察器处于空闲状态,没有触发任何事件。
- 经过一段未知的时间后,一个文件被放置在目录中
- 创建的事件被触发
- 可观察列表开始缓冲(所有事件)10 秒
- 10 秒后,订阅操作开始执行,它将立即处理所有事件
希望这会让事情变得清晰
我假设您想要以下行为:
- 初始事件后,缓冲接下来 10 秒的所有事件。
- 一旦这 10 秒 window 结束,下一个应该为 10 秒后的所有事件触发一个新的 10 秒缓冲区。
假设我们有 5 个事件在 5 秒内均匀分布,间隔 13 秒,然后另外 5 个事件在 5 秒内均匀分布。大理石图看起来像这样:
timeline: 0--1--2--3--4--5--6--7--8--9-10-11-12-13-14-15-16-17-18-19-20-21-22-23-24-25-26-27
events : x--x--x--x--x-------------------------------------x--x--x--x--x------------------
stdbuff : |----------------------------|-----------------------------|---------------------
desired : BeginCapture-----------------Return---------------BeginCapture------------------Return
使用直接 Buffer
的问题在于它看起来像上面提到的 stdbuff
,并将第二组事件分成两组,从而产生两个列表第二组事件:一组有三个事件,一组有两个事件。您想要一个列表(针对第二组),使用类似 desired
流的逻辑。从 0 开始捕获,return 在 10 列出。从 17 开始捕获,return 在 27 列出。
如果我(又)误解了您的意思,请 post 与上面类似的大理石图,代表您希望事情如何运作。
假设我理解正确,下面的代码将起作用...
//var initialSource = Observable.FromEventPattern<FileSystemEventArgs>(fileWatcher, nameof(FileSystemWatcher.Created))
// .Merge(Observable.FromEventPattern<FileSystemEventArgs>(fileWatcher, nameof(FileSystemWatcher.Changed)));
//Comment this out, and use the above lines for your code. This just makes testing the Rx components much easier.
var initialSource = Observable.Interval(TimeSpan.FromSeconds(1)).Take(5)
.Concat(Observable.Empty<long>().Delay(TimeSpan.FromSeconds(13)))
.Concat(Observable.Interval(TimeSpan.FromSeconds(1)).Take(5));
initialSource
.Publish( _source => _source
.Buffer(_source
.Scan(DateTimeOffset.MinValue, (lastPrimary, _) => DateTimeOffset.Now - lastPrimary > TimeSpan.FromSeconds(10) ? DateTimeOffset.Now : lastPrimary)
.DistinctUntilChanged()
.Delay(TimeSpan.FromSeconds(10))
)
)
.Subscribe(list =>
{
Debug.WriteLine($"Time-stamp: {DateTime.Now.ToLongTimeString()}");
Debug.WriteLine($"List Count: {list.Count}");
});
解释:
首先我们需要识别 'primary events',它们代表上面 desired
流描述中的 BeginCapture
注释。可以这样找到:
var primaryEvents = initialSource
.Scan(DateTimeOffset.MinValue, (lastPrimary, _) => DateTimeOffset.Now - lastPrimary > TimeSpan.FromSeconds(10) ? DateTimeOffset.Now : lastPrimary)
.DistinctUntilChanged();
一旦我们有了 BeginCapture
事件,它可以表示 window 开盘,就很容易找到 Return
事件或 window 收盘:
var closeEvents = primaryEvents.Delay(TimeSpan.FromSeconds(10));
实际上,由于我们关心的关闭和打开之间没有发生任何事情,我们实际上只需要担心关闭事件,所以我们可以将其缩小为:
var closeEvents = initialSource
.Scan(DateTimeOffset.MinValue, (lastPrimary, _) => DateTimeOffset.Now - lastPrimary > TimeSpan.FromSeconds(10) ? DateTimeOffset.Now : lastPrimary)
.DistinctUntilChanged()
.Delay(TimeSpan.FromSeconds(10));
将其插入 Buffer
,closeEvents
为 bufferBoundaries
:
var bufferredLists = initialSource
.Buffer(initialsource
.Scan(DateTimeOffset.MinValue, (lastPrimary, _) => DateTimeOffset.Now - lastPrimary > TimeSpan.FromSeconds(10) ? DateTimeOffset.Now : lastPrimary)
.DistinctUntilChanged()
.Delay(TimeSpan.FromSeconds(10))
);
最后,由于我们有多个订阅 initialSource
,我们希望使用 Publish
来确保并发正常工作,从而得出最终答案。
我有一个文件观察器,我可以从中观察创建和更改的事件。 我希望当第一个事件被触发(创建或更改)时,它需要开始缓冲 10 秒,在这 10 秒之后我想处理缓冲的事件。
我已经得到的是:
Observable.FromEventPattern<FileSystemEventArgs>(FileSystemWatcher, "Created")
.Merge(Observable.FromEventPattern<FileSystemEventArgs>(FileSystemWatcher, "Changed"))
.Buffer(TimeSpan.FromSeconds(10))
.Subscribe(list =>
{
Debug.WriteLine("Do something");
});
此代码每 10 秒执行 'Debug.WriteLine("Do something");'。
编辑: 好的,让我尝试用时间线来解释它。
- 文件观察器处于空闲状态,没有触发任何事件。
- 经过一段未知的时间后,一个文件被放置在目录中
- 创建的事件被触发
- 可观察列表开始缓冲(所有事件)10 秒
- 10 秒后,订阅操作开始执行,它将立即处理所有事件
希望这会让事情变得清晰
我假设您想要以下行为:
- 初始事件后,缓冲接下来 10 秒的所有事件。
- 一旦这 10 秒 window 结束,下一个应该为 10 秒后的所有事件触发一个新的 10 秒缓冲区。
假设我们有 5 个事件在 5 秒内均匀分布,间隔 13 秒,然后另外 5 个事件在 5 秒内均匀分布。大理石图看起来像这样:
timeline: 0--1--2--3--4--5--6--7--8--9-10-11-12-13-14-15-16-17-18-19-20-21-22-23-24-25-26-27
events : x--x--x--x--x-------------------------------------x--x--x--x--x------------------
stdbuff : |----------------------------|-----------------------------|---------------------
desired : BeginCapture-----------------Return---------------BeginCapture------------------Return
使用直接 Buffer
的问题在于它看起来像上面提到的 stdbuff
,并将第二组事件分成两组,从而产生两个列表第二组事件:一组有三个事件,一组有两个事件。您想要一个列表(针对第二组),使用类似 desired
流的逻辑。从 0 开始捕获,return 在 10 列出。从 17 开始捕获,return 在 27 列出。
如果我(又)误解了您的意思,请 post 与上面类似的大理石图,代表您希望事情如何运作。
假设我理解正确,下面的代码将起作用...
//var initialSource = Observable.FromEventPattern<FileSystemEventArgs>(fileWatcher, nameof(FileSystemWatcher.Created))
// .Merge(Observable.FromEventPattern<FileSystemEventArgs>(fileWatcher, nameof(FileSystemWatcher.Changed)));
//Comment this out, and use the above lines for your code. This just makes testing the Rx components much easier.
var initialSource = Observable.Interval(TimeSpan.FromSeconds(1)).Take(5)
.Concat(Observable.Empty<long>().Delay(TimeSpan.FromSeconds(13)))
.Concat(Observable.Interval(TimeSpan.FromSeconds(1)).Take(5));
initialSource
.Publish( _source => _source
.Buffer(_source
.Scan(DateTimeOffset.MinValue, (lastPrimary, _) => DateTimeOffset.Now - lastPrimary > TimeSpan.FromSeconds(10) ? DateTimeOffset.Now : lastPrimary)
.DistinctUntilChanged()
.Delay(TimeSpan.FromSeconds(10))
)
)
.Subscribe(list =>
{
Debug.WriteLine($"Time-stamp: {DateTime.Now.ToLongTimeString()}");
Debug.WriteLine($"List Count: {list.Count}");
});
解释:
首先我们需要识别 'primary events',它们代表上面 desired
流描述中的 BeginCapture
注释。可以这样找到:
var primaryEvents = initialSource
.Scan(DateTimeOffset.MinValue, (lastPrimary, _) => DateTimeOffset.Now - lastPrimary > TimeSpan.FromSeconds(10) ? DateTimeOffset.Now : lastPrimary)
.DistinctUntilChanged();
一旦我们有了 BeginCapture
事件,它可以表示 window 开盘,就很容易找到 Return
事件或 window 收盘:
var closeEvents = primaryEvents.Delay(TimeSpan.FromSeconds(10));
实际上,由于我们关心的关闭和打开之间没有发生任何事情,我们实际上只需要担心关闭事件,所以我们可以将其缩小为:
var closeEvents = initialSource
.Scan(DateTimeOffset.MinValue, (lastPrimary, _) => DateTimeOffset.Now - lastPrimary > TimeSpan.FromSeconds(10) ? DateTimeOffset.Now : lastPrimary)
.DistinctUntilChanged()
.Delay(TimeSpan.FromSeconds(10));
将其插入 Buffer
,closeEvents
为 bufferBoundaries
:
var bufferredLists = initialSource
.Buffer(initialsource
.Scan(DateTimeOffset.MinValue, (lastPrimary, _) => DateTimeOffset.Now - lastPrimary > TimeSpan.FromSeconds(10) ? DateTimeOffset.Now : lastPrimary)
.DistinctUntilChanged()
.Delay(TimeSpan.FromSeconds(10))
);
最后,由于我们有多个订阅 initialSource
,我们希望使用 Publish
来确保并发正常工作,从而得出最终答案。