使用 Reactive Extensions 进行数据库轮询
Database polling with Reactive Extensions
我必须及时查询数据库以了解遗留系统的状态。我想过将查询包装在 Observable
周围,但我不知道正确的方法。
基本上每5秒就是同一个查询但是我恐怕要面对这些问题了:
- 如果查询的执行需要 10 秒怎么办?我不想
如果前一个查询仍在处理中,则执行任何新查询。
- 另外,应该有一个超时。如果当前查询没有执行
例如,20 秒后,应显示一条信息性消息
已记录并应发送新的尝试(相同的查询)。
额外的细节:
- 查询只是一个
SELECT
returns 具有状态代码列表的数据集(正在工作,已故障).
- Observable 序列将始终获取从查询中收到的最新数据,类似于 Switch 扩展方法。
- 我想将数据库查询(冗长的操作)包装到任务中,但我不确定这是否是最佳选择。
我几乎可以肯定查询应该在另一个线程中执行,但我不知道可观察对象应该是什么样子,读过 Introduction to Rx by Lee Campbell。
我认为这是你应该做的:
var query =
from n in Observable.Interval(TimeSpan.FromSeconds(5.0))
from ds in Observable.Amb(
Observable.Start(() => /* Your DataSet query */),
Observable
.Timer(TimeSpan.FromSeconds(10.0))
.Select(_ => new DataSet("TimeOut")))
select ds;
这会触发一个新查询, 执行间隔为 5 秒。距离上一个开始还不到5秒,距离上一个结束才5秒
然后你尝试你的查询,但是你 .Amb
它有一个计时器,在 10 秒后 returns 一个特殊的 DataSet
。如果您的查询在 10 秒之前完成,那么它将获胜,否则将返回特殊的 DataSet
。 .Amb
运算符基本上是一个 "race" 运算符 - 第一个产生值的可观察对象获胜。
这是一个相当经典的使用 Rx 轮询另一个系统的案例。大多数人会使用 Observable.Interval
作为他们的首选操作员,并且对大多数人来说没问题。
但是您对超时和重试有特定要求。在这种情况下,我认为您最好结合使用运算符:
Observable.Timer
允许您在指定时间执行查询
Timeout
识别超限的数据库查询
ToObservable()
将您的 Task
结果映射到可观察的序列。
Retry
允许您在超时后恢复
Repeat
允许您在数据库查询成功后继续。这也将在上一个数据库查询完成和下一个数据库查询开始之间保持初始 period/gap。
这个有效的 LINQPad 片段应该向您展示查询正常工作:
void Main()
{
var pollingPeriod = TimeSpan.FromSeconds(5);
var dbQueryTimeout = TimeSpan.FromSeconds(10);
//You will want to have your Rx query timeout after the expected silence of the timer, and then further maximum silence.
var rxQueryTimeOut = pollingPeriod + dbQueryTimeout;
var scheduler = new EventLoopScheduler(ts => new Thread(ts) { Name = "DatabasePoller" });
var query = Observable.Timer(pollingPeriod, scheduler)
.SelectMany(_ => DatabaseQuery().ToObservable())
.Timeout(rxQueryTimeOut, Observable.Return("Timeout"), scheduler)
.Retry() //Loop on errors
.Repeat(); //Loop on success
query.StartWith("Seed")
.TimeInterval(scheduler) //Just to debug, print the timing gaps.
.Dump();
}
// Define other methods and classes here
private static int delay = 9;
private static int delayModifier = 1;
public async Task<string> DatabaseQuery()
{
//Oscillate the delay between 3 and 12 seconds
delay += delayModifier;
var timespan = TimeSpan.FromSeconds(delay);
if (delay < 4 || delay > 11)
delayModifier *= -1;
timespan.Dump("delay");
await Task.Delay(timespan);
return "Value";
}
结果如下:
Seed 00:00:00.0125407
Timeout 00:00:15.0166379
Timeout 00:00:15.0124480
Timeout 00:00:15.0004520
Timeout 00:00:15.0013296
Timeout 00:00:15.0140864
Value 00:00:14.0251731
Value 00:00:13.0231958
Value 00:00:12.0162236
Value 00:00:11.0138606
样本的关键部分是....
var query = Observable.Timer(TimeSpan.FromSeconds(5), scheduler)
.SelectMany(_ => DatabaseQuery().ToObservable())
.Timeout(rxQueryTimeOut, Observable.Return("Timeout"), scheduler)
.Retry() //Loop on errors
.Repeat(); //Loop on success
编辑:
以下是对如何得出此解决方案的进一步说明。 https://github.com/LeeCampbell/RxCookbook/blob/master/Repository/Polling.md
我必须及时查询数据库以了解遗留系统的状态。我想过将查询包装在 Observable
周围,但我不知道正确的方法。
基本上每5秒就是同一个查询但是我恐怕要面对这些问题了:
- 如果查询的执行需要 10 秒怎么办?我不想 如果前一个查询仍在处理中,则执行任何新查询。
- 另外,应该有一个超时。如果当前查询没有执行 例如,20 秒后,应显示一条信息性消息 已记录并应发送新的尝试(相同的查询)。
额外的细节:
- 查询只是一个
SELECT
returns 具有状态代码列表的数据集(正在工作,已故障). - Observable 序列将始终获取从查询中收到的最新数据,类似于 Switch 扩展方法。
- 我想将数据库查询(冗长的操作)包装到任务中,但我不确定这是否是最佳选择。
我几乎可以肯定查询应该在另一个线程中执行,但我不知道可观察对象应该是什么样子,读过 Introduction to Rx by Lee Campbell。
我认为这是你应该做的:
var query =
from n in Observable.Interval(TimeSpan.FromSeconds(5.0))
from ds in Observable.Amb(
Observable.Start(() => /* Your DataSet query */),
Observable
.Timer(TimeSpan.FromSeconds(10.0))
.Select(_ => new DataSet("TimeOut")))
select ds;
这会触发一个新查询, 执行间隔为 5 秒。距离上一个开始还不到5秒,距离上一个结束才5秒
然后你尝试你的查询,但是你 .Amb
它有一个计时器,在 10 秒后 returns 一个特殊的 DataSet
。如果您的查询在 10 秒之前完成,那么它将获胜,否则将返回特殊的 DataSet
。 .Amb
运算符基本上是一个 "race" 运算符 - 第一个产生值的可观察对象获胜。
这是一个相当经典的使用 Rx 轮询另一个系统的案例。大多数人会使用 Observable.Interval
作为他们的首选操作员,并且对大多数人来说没问题。
但是您对超时和重试有特定要求。在这种情况下,我认为您最好结合使用运算符:
Observable.Timer
允许您在指定时间执行查询Timeout
识别超限的数据库查询ToObservable()
将您的Task
结果映射到可观察的序列。Retry
允许您在超时后恢复Repeat
允许您在数据库查询成功后继续。这也将在上一个数据库查询完成和下一个数据库查询开始之间保持初始 period/gap。
这个有效的 LINQPad 片段应该向您展示查询正常工作:
void Main()
{
var pollingPeriod = TimeSpan.FromSeconds(5);
var dbQueryTimeout = TimeSpan.FromSeconds(10);
//You will want to have your Rx query timeout after the expected silence of the timer, and then further maximum silence.
var rxQueryTimeOut = pollingPeriod + dbQueryTimeout;
var scheduler = new EventLoopScheduler(ts => new Thread(ts) { Name = "DatabasePoller" });
var query = Observable.Timer(pollingPeriod, scheduler)
.SelectMany(_ => DatabaseQuery().ToObservable())
.Timeout(rxQueryTimeOut, Observable.Return("Timeout"), scheduler)
.Retry() //Loop on errors
.Repeat(); //Loop on success
query.StartWith("Seed")
.TimeInterval(scheduler) //Just to debug, print the timing gaps.
.Dump();
}
// Define other methods and classes here
private static int delay = 9;
private static int delayModifier = 1;
public async Task<string> DatabaseQuery()
{
//Oscillate the delay between 3 and 12 seconds
delay += delayModifier;
var timespan = TimeSpan.FromSeconds(delay);
if (delay < 4 || delay > 11)
delayModifier *= -1;
timespan.Dump("delay");
await Task.Delay(timespan);
return "Value";
}
结果如下:
Seed 00:00:00.0125407
Timeout 00:00:15.0166379
Timeout 00:00:15.0124480
Timeout 00:00:15.0004520
Timeout 00:00:15.0013296
Timeout 00:00:15.0140864
Value 00:00:14.0251731
Value 00:00:13.0231958
Value 00:00:12.0162236
Value 00:00:11.0138606
样本的关键部分是....
var query = Observable.Timer(TimeSpan.FromSeconds(5), scheduler)
.SelectMany(_ => DatabaseQuery().ToObservable())
.Timeout(rxQueryTimeOut, Observable.Return("Timeout"), scheduler)
.Retry() //Loop on errors
.Repeat(); //Loop on success
编辑: 以下是对如何得出此解决方案的进一步说明。 https://github.com/LeeCampbell/RxCookbook/blob/master/Repository/Polling.md