使用 Reactive Extensions 进行数据库轮询

Database polling with Reactive Extensions

我必须及时查询数据库以了解遗留系统的状态。我想过将查询包装在 Observable 周围,但我不知道正确的方法。

基本上每5秒就是同一个查询但是我恐怕要面对这些问题了:

额外的细节:

我几乎可以肯定查询应该在另一个线程中执行,但我不知道可观察对象应该是什么样子,读过 Introduction to Rx by Lee Campbell

我认为这是你应该做的:

var query =
    from n in Observable.Interval(TimeSpan.FromSeconds(5.0))
    from ds in Observable.Amb(
        Observable.Start(() => /* Your DataSet query */),
        Observable
            .Timer(TimeSpan.FromSeconds(10.0))
            .Select(_ => new DataSet("TimeOut")))
    select ds;

这会触发一个新查询, 执行间隔为 5 秒。距离上一个开始还不到5秒,距离上一个结束才5秒

然后你尝试你的查询,但是你 .Amb 它有一个计时器,在 10 秒后 returns 一个特殊的 DataSet。如果您的查询在 10 秒之前完成,那么它将获胜,否则将返回特殊的 DataSet.Amb 运算符基本上是一个 "race" 运算符 - 第一个产生值的可观察对象获胜。

这是一个相当经典的使用 Rx 轮询另一个系统的案例。大多数人会使用 Observable.Interval 作为他们的首选操作员,并且对大多数人来说没问题。

但是您对超时和重试有特定要求。在这种情况下,我认为您最好结合使用运算符:

  • Observable.Timer 允许您在指定时间执行查询
  • Timeout 识别超限的数据库查询
  • ToObservable() 将您的 Task 结果映射到可观察的序列。
  • Retry 允许您在超时后恢复
  • Repeat 允许您在数据库查询成功后继续。这也将在上一个数据库查询完成和下一个数据库查询开始之间保持初始 period/gap。

这个有效的 LINQPad 片段应该向您展示查询正常工作:

void Main()
{
    var pollingPeriod = TimeSpan.FromSeconds(5);
    var dbQueryTimeout = TimeSpan.FromSeconds(10);

    //You will want to have your Rx query timeout after the expected silence of the timer, and then further maximum silence.
    var rxQueryTimeOut = pollingPeriod + dbQueryTimeout;

    var scheduler = new EventLoopScheduler(ts => new Thread(ts) { Name = "DatabasePoller" });

    var query = Observable.Timer(pollingPeriod, scheduler)
                    .SelectMany(_ => DatabaseQuery().ToObservable())
                    .Timeout(rxQueryTimeOut, Observable.Return("Timeout"), scheduler)
                    .Retry()    //Loop on errors
                    .Repeat();  //Loop on success

    query.StartWith("Seed")
        .TimeInterval(scheduler)    //Just to debug, print the timing gaps.
        .Dump();
}

// Define other methods and classes here
private static int delay = 9;
private static int delayModifier = 1;
public async Task<string> DatabaseQuery()
{
    //Oscillate the delay between 3 and 12 seconds
    delay += delayModifier;
    var timespan = TimeSpan.FromSeconds(delay);
    if (delay < 4 || delay > 11)
        delayModifier *= -1;
    timespan.Dump("delay");
    await Task.Delay(timespan);
    return "Value";
}

结果如下:

Seed 00:00:00.0125407
Timeout 00:00:15.0166379
Timeout 00:00:15.0124480
Timeout 00:00:15.0004520
Timeout 00:00:15.0013296
Timeout 00:00:15.0140864
Value 00:00:14.0251731
Value 00:00:13.0231958
Value 00:00:12.0162236
Value 00:00:11.0138606

样本的关键部分是....

var query = Observable.Timer(TimeSpan.FromSeconds(5), scheduler)
                .SelectMany(_ => DatabaseQuery().ToObservable())
                .Timeout(rxQueryTimeOut, Observable.Return("Timeout"), scheduler)
                .Retry()    //Loop on errors
                .Repeat();  //Loop on success

编辑: 以下是对如何得出此解决方案的进一步说明。 https://github.com/LeeCampbell/RxCookbook/blob/master/Repository/Polling.md