反应式 - 将可观察间隔与手动触发器相结合
Reactive - Combining Observable Interval with manual triggers
我有 Observable.Interval(TimeSpan.FromSeconds(1))
和一个订阅者,它在每次出现时间间隔时检查数据库中的内容。但有时当我从数据库进行检查时,我想立即执行另一次检查(再次调用该订阅者,因为我知道队列中有东西)。
我已经通过在订阅者方法中将 Interval 与 while
组合来实现类似的效果:
Observable
.Interval(TimeSpan.FromSeconds(1))
.Sample(TimeSpan.FromSeconds(1)) //to avoid multiple 'stacked' intervals
.Subscribe(RepeatAction);
private void RepeatAction(long _)
{
bool wasSuccess;
do
{
wasSuccess = CheckingInDB(); //Long operation
} while (wasSuccess );
}
但是使用 纯 反应是否有可能实现这种行为?
是的。有可能。
不过首先,您对 Rx 存在误解。
如果您 运行 此代码:
void Main()
{
Observable
.Interval(TimeSpan.FromSeconds(1.0))
.Sample(TimeSpan.FromSeconds(1.0))
.Timestamp()
.Subscribe(RepeatAction);
}
private void RepeatAction(Timestamped<long> _)
{
Console.WriteLine(_.Timestamp);
Thread.Sleep(10000);
}
你会得到这样的结果:
2016/05/11 10:37:57 +00:00
2016/05/11 10:38:07 +00:00
2016/05/11 10:38:17 +00:00
2016/05/11 10:38:27 +00:00
您会看到生成的每个值之间的步长是 10 秒,而不是 1 秒。Interval
运算符只是确保每个值之间的间隔至少 TimeSpan
的持续时间,但如果观察者花费更长的时间,则持续时间将与每个订阅者花费的时间一样长。它不会对值进行排队。
另一种看待它的方式是 .Sample(TimeSpan.FromSeconds(1))
什么都不做,因为 .Interval(TimeSpan.FromSeconds(1.0))
确保值之间的最小差距已经是 1 秒。
现在,使用纯 Rx 运算符来解决问题。试试这个:
var query =
Observable
.Interval(TimeSpan.FromSeconds(1.0))
.Select(_ =>
Observable
.While(
() => CheckingInDB(),
Observable.Return(Unit.Default)))
.Switch();
这将每秒尝试检查数据库,但一旦它达到一个值,它就会快速重复检查,直到它没有。然后它等待 1 秒并重试。
我有 Observable.Interval(TimeSpan.FromSeconds(1))
和一个订阅者,它在每次出现时间间隔时检查数据库中的内容。但有时当我从数据库进行检查时,我想立即执行另一次检查(再次调用该订阅者,因为我知道队列中有东西)。
我已经通过在订阅者方法中将 Interval 与 while
组合来实现类似的效果:
Observable
.Interval(TimeSpan.FromSeconds(1))
.Sample(TimeSpan.FromSeconds(1)) //to avoid multiple 'stacked' intervals
.Subscribe(RepeatAction);
private void RepeatAction(long _)
{
bool wasSuccess;
do
{
wasSuccess = CheckingInDB(); //Long operation
} while (wasSuccess );
}
但是使用 纯 反应是否有可能实现这种行为?
是的。有可能。
不过首先,您对 Rx 存在误解。
如果您 运行 此代码:
void Main()
{
Observable
.Interval(TimeSpan.FromSeconds(1.0))
.Sample(TimeSpan.FromSeconds(1.0))
.Timestamp()
.Subscribe(RepeatAction);
}
private void RepeatAction(Timestamped<long> _)
{
Console.WriteLine(_.Timestamp);
Thread.Sleep(10000);
}
你会得到这样的结果:
2016/05/11 10:37:57 +00:00
2016/05/11 10:38:07 +00:00
2016/05/11 10:38:17 +00:00
2016/05/11 10:38:27 +00:00
您会看到生成的每个值之间的步长是 10 秒,而不是 1 秒。Interval
运算符只是确保每个值之间的间隔至少 TimeSpan
的持续时间,但如果观察者花费更长的时间,则持续时间将与每个订阅者花费的时间一样长。它不会对值进行排队。
另一种看待它的方式是 .Sample(TimeSpan.FromSeconds(1))
什么都不做,因为 .Interval(TimeSpan.FromSeconds(1.0))
确保值之间的最小差距已经是 1 秒。
现在,使用纯 Rx 运算符来解决问题。试试这个:
var query =
Observable
.Interval(TimeSpan.FromSeconds(1.0))
.Select(_ =>
Observable
.While(
() => CheckingInDB(),
Observable.Return(Unit.Default)))
.Switch();
这将每秒尝试检查数据库,但一旦它达到一个值,它就会快速重复检查,直到它没有。然后它等待 1 秒并重试。