反应性扩展 Timer/Interval 重置
Reactive extension Timer/Interval reset
我有一个项目,我需要每 10 秒发送一次状态消息,除非同时有更新。意思是,每次有更新时,计时器都会重置。
var res = Observable
.Interval(TimeSpan.FromSeconds(10))
.Where(_ => condition);
res.Subscribe(_ => Console.WriteLine("Status sent."));
现在我知道 "Where" 只会在计时器结束时应用,所以它没有帮助。但是,我想知道是否有办法重置间隔;或使用带重复的 Timer()。
使用标准 Rx 运算符很容易实现。
您的问题中不清楚的是 "update" 到底是什么。我将假设您有某种会在每次更新时触发的可观察对象,或者您可以创建一个主题,当有更新时,您将调用 .OnNext(...)
。如果没有可观察的更新,很难知道何时重置计时器。
代码如下:
var update = new Subject<bool>();
var res =
update
.Select(x => Observable.Interval(TimeSpan.FromSeconds(10.0)))
.Switch();
res
.Subscribe(_ => Console.WriteLine("Status sent."));
update.OnNext(true);
res
查询现在等待,直到它从 update
获得一个值,然后选择一个新的 Observable.Interval
。这意味着在 Select
之后,类型是 IObservable<IObservable<long>>
,因此需要 .Switch()
才能将其转换为 IObservable<long>
。 .Switch()
通过仅传递最新观察到的可观察对象的值并处理任何先前的可观察对象来做到这一点。换句话说,对于每个 update
一个新的定时器被启动并且之前的定时器被取消。这意味着如果您的更新发生频率超过 10 秒,则计时器将永远不会触发。
现在,如果 res
observable 本身就是一个更新,那么您可以这样做:
res
.Subscribe(_ =>
{
update.OnNext(true);
Console.WriteLine("Status sent.");
});
没关系 - 它仍然有效,但对于每个计时器触发 res
都会创建一个新计时器。这意味着任何依赖于您的 update
observable/subject 的东西仍将正常运行。
我认为您也可以在这里使用 Throttle。 Throttle 的目的是在给定时间跨度内接收到另一个元素时不让元素通过。因此,在您的情况下,如果在 10 秒内收到更新消息,则不要发送状态。请参阅下面的单元测试,它使用 200 个滴答作为节流周期。
[TestMethod]
public void Publish_Status_If_Nothing_Receieved()
{
//Arrange
var scheduler = new TestScheduler();
var statusObserver = scheduler.CreateObserver<Unit>();
var updateStream = scheduler.CreateColdObservable(OnNext(100, 1), OnNext(200, 2), OnNext(600, 3),
OnNext(700, 4));
var waitTime = TimeSpan.FromTicks(200);
//Act
updateStream.Throttle(waitTime, scheduler)
.Select(_ => Unit.Default)
.Subscribe(statusObserver);
//Verify no status received
scheduler.AdvanceTo(100);
Assert.AreEqual(0, statusObserver.Messages.Count);
//Verify no status received
scheduler.AdvanceTo(200);
Assert.AreEqual(0, statusObserver.Messages.Count);
//Assert status received
scheduler.AdvanceTo(400);
statusObserver.Messages.AssertEqual(OnNext(400, Unit.Default));
//Verify no more status received
scheduler.AdvanceTo(700);
statusObserver.Messages.AssertEqual(OnNext(400, Unit.Default));
}
我随身带着这个小帮手方法:
public static IObservable<long> CreateAutoResetInterval<TSource>(IObservable<TSource> resetter, TimeSpan timeSpan, bool immediate = false)
{
return resetter.Select(_ => immediate ? Observable.Interval(timeSpan).StartWith(0) : Observable.Interval(timeSpan)).Switch();
}
基本上和Enigmativity的回答是一样的机制
我有一个项目,我需要每 10 秒发送一次状态消息,除非同时有更新。意思是,每次有更新时,计时器都会重置。
var res = Observable
.Interval(TimeSpan.FromSeconds(10))
.Where(_ => condition);
res.Subscribe(_ => Console.WriteLine("Status sent."));
现在我知道 "Where" 只会在计时器结束时应用,所以它没有帮助。但是,我想知道是否有办法重置间隔;或使用带重复的 Timer()。
使用标准 Rx 运算符很容易实现。
您的问题中不清楚的是 "update" 到底是什么。我将假设您有某种会在每次更新时触发的可观察对象,或者您可以创建一个主题,当有更新时,您将调用 .OnNext(...)
。如果没有可观察的更新,很难知道何时重置计时器。
代码如下:
var update = new Subject<bool>();
var res =
update
.Select(x => Observable.Interval(TimeSpan.FromSeconds(10.0)))
.Switch();
res
.Subscribe(_ => Console.WriteLine("Status sent."));
update.OnNext(true);
res
查询现在等待,直到它从 update
获得一个值,然后选择一个新的 Observable.Interval
。这意味着在 Select
之后,类型是 IObservable<IObservable<long>>
,因此需要 .Switch()
才能将其转换为 IObservable<long>
。 .Switch()
通过仅传递最新观察到的可观察对象的值并处理任何先前的可观察对象来做到这一点。换句话说,对于每个 update
一个新的定时器被启动并且之前的定时器被取消。这意味着如果您的更新发生频率超过 10 秒,则计时器将永远不会触发。
现在,如果 res
observable 本身就是一个更新,那么您可以这样做:
res
.Subscribe(_ =>
{
update.OnNext(true);
Console.WriteLine("Status sent.");
});
没关系 - 它仍然有效,但对于每个计时器触发 res
都会创建一个新计时器。这意味着任何依赖于您的 update
observable/subject 的东西仍将正常运行。
我认为您也可以在这里使用 Throttle。 Throttle 的目的是在给定时间跨度内接收到另一个元素时不让元素通过。因此,在您的情况下,如果在 10 秒内收到更新消息,则不要发送状态。请参阅下面的单元测试,它使用 200 个滴答作为节流周期。
[TestMethod]
public void Publish_Status_If_Nothing_Receieved()
{
//Arrange
var scheduler = new TestScheduler();
var statusObserver = scheduler.CreateObserver<Unit>();
var updateStream = scheduler.CreateColdObservable(OnNext(100, 1), OnNext(200, 2), OnNext(600, 3),
OnNext(700, 4));
var waitTime = TimeSpan.FromTicks(200);
//Act
updateStream.Throttle(waitTime, scheduler)
.Select(_ => Unit.Default)
.Subscribe(statusObserver);
//Verify no status received
scheduler.AdvanceTo(100);
Assert.AreEqual(0, statusObserver.Messages.Count);
//Verify no status received
scheduler.AdvanceTo(200);
Assert.AreEqual(0, statusObserver.Messages.Count);
//Assert status received
scheduler.AdvanceTo(400);
statusObserver.Messages.AssertEqual(OnNext(400, Unit.Default));
//Verify no more status received
scheduler.AdvanceTo(700);
statusObserver.Messages.AssertEqual(OnNext(400, Unit.Default));
}
我随身带着这个小帮手方法:
public static IObservable<long> CreateAutoResetInterval<TSource>(IObservable<TSource> resetter, TimeSpan timeSpan, bool immediate = false)
{
return resetter.Select(_ => immediate ? Observable.Interval(timeSpan).StartWith(0) : Observable.Interval(timeSpan)).Switch();
}
基本上和Enigmativity的回答是一样的机制