Rx、TPL 数据流和 windows 服务
Rx, TPL dataflow and windows service
在 windows 服务持续工作的情况下使用 Rx 和数据流是个好主意吗?
我只是想知道使用 rx 间隔每 5 分钟从 db 获取数据并将其放入阻塞收集,然后使用 Tpl 数据流来处理数据。这是正确的方法吗?
更新
我有新问题。我必须从数据库中获取项目,验证它并在数据库上执行提交。即使 validation return false 我也必须执行提交。现在,我想以 5 分钟的间隔从数据库中获取数据,但我不想在 dataFlow.So 中有双行,也许我必须使用 blockingCollection 来保留行 ID?
不完全是。
首先,您根本不需要 BlockingCollection。数据流块有自己的线程安全输入缓冲区。
其次,块也意味着使用它们自己的任务来执行,这使得它们适合 运行 操作,例如调用数据库和处理它们的结果。另一方面,Rx 旨在处理事件流,这通常是使用 单个 线程完成的。
也就是说,您可以组合 Rx 和 Dataflow 块。我正在使用数据流管道
其次,Rx 是用来处理事件流的。为此,它通常在 current 线程上运行。
也就是说,您可以组合 Rx 和数据流块。将间隔序列与块组合起来很容易:
var block=new ActionBlock<int>(i=>whatever(i));
var blockObs=blockObs.AsObserver();
var interval=Observable.Interval(obs).Subscribe(obs);
使用数据库需要更多块。假设您要处理单独的行,第一个块应该接收触发器、加载数据和 return 行。第二个块应该接收单独的行并处理它们。
假设您使用 Dapper 将 return 行作为对象:
var headBlock=new TransformManyBlock<int,SomeRow>(_=>
{
using(var con=new SqlConnection(..whatever))
{
var items=con.ExecuteQuery(theQuery);
return itmes;
}
});
var secondBlock = new ActionBlock<SomeRow>(row=>DoSomething(row),
new ExecutionDatalfowBlockOptions{MaxDegreeOfParalelism=10});
headBlock.LinkTo(secondBlock,new DataFlowLinkOptions{PropagateCompletion=true});
var headObs=headBlock.AsObserver();
Observable.Interval(TimeSpan.FromMinutes(1)).Subscribe(headObs);
不过这只是一个演示。当您关闭服务器并等待任何挂起的操作完成时,您应该添加代码来停止管道。
您还应该添加错误处理代码,即使发生错误也能让您继续处理。如果未处理异常,抛出的块将终止。如果您 return 一个 "wrapped" 结果,则可以处理此问题,并带有指示成功或失败的标志。 LinkTo
接受一个谓词,您可以使用该谓词将失败的 messages/rows 例如移动到记录器或丢弃它们,例如 :
var loggerBlock=new ActionBlock<RowWithFlag>(row=>_log.Error(...));
headBlock.LinkTo(secondBlock,linkOptions,row=>row.IsOK);
headBlock.LinkTo(loggerBlock,linkOptions,row=>!row.IsOK);
在 windows 服务持续工作的情况下使用 Rx 和数据流是个好主意吗? 我只是想知道使用 rx 间隔每 5 分钟从 db 获取数据并将其放入阻塞收集,然后使用 Tpl 数据流来处理数据。这是正确的方法吗?
更新
我有新问题。我必须从数据库中获取项目,验证它并在数据库上执行提交。即使 validation return false 我也必须执行提交。现在,我想以 5 分钟的间隔从数据库中获取数据,但我不想在 dataFlow.So 中有双行,也许我必须使用 blockingCollection 来保留行 ID?
不完全是。
首先,您根本不需要 BlockingCollection。数据流块有自己的线程安全输入缓冲区。
其次,块也意味着使用它们自己的任务来执行,这使得它们适合 运行 操作,例如调用数据库和处理它们的结果。另一方面,Rx 旨在处理事件流,这通常是使用 单个 线程完成的。
也就是说,您可以组合 Rx 和 Dataflow 块。我正在使用数据流管道
其次,Rx 是用来处理事件流的。为此,它通常在 current 线程上运行。
也就是说,您可以组合 Rx 和数据流块。将间隔序列与块组合起来很容易:
var block=new ActionBlock<int>(i=>whatever(i));
var blockObs=blockObs.AsObserver();
var interval=Observable.Interval(obs).Subscribe(obs);
使用数据库需要更多块。假设您要处理单独的行,第一个块应该接收触发器、加载数据和 return 行。第二个块应该接收单独的行并处理它们。
假设您使用 Dapper 将 return 行作为对象:
var headBlock=new TransformManyBlock<int,SomeRow>(_=>
{
using(var con=new SqlConnection(..whatever))
{
var items=con.ExecuteQuery(theQuery);
return itmes;
}
});
var secondBlock = new ActionBlock<SomeRow>(row=>DoSomething(row),
new ExecutionDatalfowBlockOptions{MaxDegreeOfParalelism=10});
headBlock.LinkTo(secondBlock,new DataFlowLinkOptions{PropagateCompletion=true});
var headObs=headBlock.AsObserver();
Observable.Interval(TimeSpan.FromMinutes(1)).Subscribe(headObs);
不过这只是一个演示。当您关闭服务器并等待任何挂起的操作完成时,您应该添加代码来停止管道。
您还应该添加错误处理代码,即使发生错误也能让您继续处理。如果未处理异常,抛出的块将终止。如果您 return 一个 "wrapped" 结果,则可以处理此问题,并带有指示成功或失败的标志。 LinkTo
接受一个谓词,您可以使用该谓词将失败的 messages/rows 例如移动到记录器或丢弃它们,例如 :
var loggerBlock=new ActionBlock<RowWithFlag>(row=>_log.Error(...));
headBlock.LinkTo(secondBlock,linkOptions,row=>row.IsOK);
headBlock.LinkTo(loggerBlock,linkOptions,row=>!row.IsOK);