Rx 方式在 c sharp 中执行某事直到条件为真或超时

Rx way to execute something in c sharp until a condition is true or timeout

我在数据库 table 中有一个作业名称列表,可以通过以作业名称作为参数调用 API 来启动。这些工作中的每一项都需要 10 到 30 分钟才能完成。还有另一个 API 可以在某个时间间隔(10 分钟)调用以检查作业的完成状态。只有当列表中的上一个作业有 completed/timeout 时,才能启动作业。这些工作做什么在这里无关紧要。

我是Rx框架的忠实粉丝,但对它知之甚少

我目前正在从数据库中获取所有作业名称,然后循环遍历它 在每个周期中,我都调用 Job-trigger API 来启动作业,然后每隔一段时间继续调用 Job-status API,直到作业完成或在下一个作业周期开始之前超时。

foreach(string jobName in lstJobs)
{
 if(StartTheJob(jobName)) //if job successfully started
 {
    do{
        Task.Delay(10000);

    }
    While(GetJobCompletionStatus(jobName));
    // how to timeout this do/while loop
 }
}

问题是,如果作业保持 运行 数小时,我找不到使 do while 循环超时并触发下一个作业的方法。使用 Rx 有更好的方法吗?

使用 CancellationToken:

foreach(string jobName in lstJobs)
{
 if(StartTheJob(jobName)) //if job successfully started
 {
    do{
        Task.Delay(10000);

    }
    While(!ct.IsCancellationRequested);
 }
}

您可以使用 CancellationTokenSource 创建一个 CancellationToken:

var cts = new CancellationTokenSource(timeout);
var ct = cts.Token;

以下是使用 Rx 执行此操作的方法:

IObservable<string> query =
    from jobName in lstJobs.ToObservable()
    from started in Observable.Start(() => StartTheJob(jobName))
    where started
    from done in
        Observable.Amb(
            Observable.Timer(TimeSpan.FromMinutes(30.0)).Select(_ => true),
            Observable
                .Interval(TimeSpan.FromMinutes(10.0))
                .SelectMany(_ => Observable.Start(() => GetJobCompletionStatus(jobName)))
                .Where(s => s))
    select jobName;

此处的逻辑是您逐步完成每个作业并调用 StartTheJob。如果成功,那么您正在等待来自(即 .Amb(...))30 分钟 .Timer(...)done 或每 10 分钟对 GetJobCompletionStatus 的调用,直到 true 被退回。最后它 returns jobName 这样你就可以看到什么是成功的。当 observable 完成时,您可以与原始列表进行比较,看看是否有任何不成功的。