使用 Observable.FromEventPattern 在不活动或计数后执行操作
Use Observable.FromEventPattern to perform action after inactivity or count
我有一个从事件模式创建的可观察流,如下所示。
var keyspaceStream = Observable.FromEventPattern<RedisSubscriptionReceivedEventArgs>(
h => keyspaceMonitor.KeySpaceChanged += h,
h => keyspaceMonitor.KeySpaceChanged -= h);
我想做的是订阅流并在有 10 秒不活动(没有事件发生)或 100 个事件被触发时执行一个方法而方法没有被执行。这是为了避免每 5 秒触发一次事件并且永远不会调用 onNext 方法的情况。
我怎样才能做到这一点?我知道如何做第一部分(见下文)但我不知道如何做计数逻辑。请注意,我已经知道如何订阅流了。
var throttledStream = keyspaceStream.Throttle(TimeSpan.FromSeconds(10));
如有任何帮助,我们将不胜感激!谢谢。
将 Buffer
与自定义 bufferClosingSelector
结合使用。这里的想法是每个缓冲区都应该在 maxDuration
之后或 maxCount
项目之后关闭,以先到者为准。每次缓冲区关闭时,都会打开一个新缓冲区。
var maxDuration = TimeSpan.FromSeconds(10);
var maxCount = 100;
var throttledStream = keyspaceStream.Publish(o =>
{
var reachedMaxDuration = o
.Select(_ => Observable.Timer(maxDuration, scheduler))
.Switch();
return o.Buffer(() => o
.TakeUntil(reachedMaxDuration)
.Take(maxCount)
.LastOrDefaultAsync());
});
我假设您提供 IScheduler scheduler
。 throttledStream
的类型将是 IObservable<IList<EventPattern<RedisSubscriptionReceivedEventArgs>>>
.
我有一个从事件模式创建的可观察流,如下所示。
var keyspaceStream = Observable.FromEventPattern<RedisSubscriptionReceivedEventArgs>(
h => keyspaceMonitor.KeySpaceChanged += h,
h => keyspaceMonitor.KeySpaceChanged -= h);
我想做的是订阅流并在有 10 秒不活动(没有事件发生)或 100 个事件被触发时执行一个方法而方法没有被执行。这是为了避免每 5 秒触发一次事件并且永远不会调用 onNext 方法的情况。
我怎样才能做到这一点?我知道如何做第一部分(见下文)但我不知道如何做计数逻辑。请注意,我已经知道如何订阅流了。
var throttledStream = keyspaceStream.Throttle(TimeSpan.FromSeconds(10));
如有任何帮助,我们将不胜感激!谢谢。
将 Buffer
与自定义 bufferClosingSelector
结合使用。这里的想法是每个缓冲区都应该在 maxDuration
之后或 maxCount
项目之后关闭,以先到者为准。每次缓冲区关闭时,都会打开一个新缓冲区。
var maxDuration = TimeSpan.FromSeconds(10);
var maxCount = 100;
var throttledStream = keyspaceStream.Publish(o =>
{
var reachedMaxDuration = o
.Select(_ => Observable.Timer(maxDuration, scheduler))
.Switch();
return o.Buffer(() => o
.TakeUntil(reachedMaxDuration)
.Take(maxCount)
.LastOrDefaultAsync());
});
我假设您提供 IScheduler scheduler
。 throttledStream
的类型将是 IObservable<IList<EventPattern<RedisSubscriptionReceivedEventArgs>>>
.