Observable.Timer 或 Task.Delay 的 TPL

Observable.Timer or TPL with Task.Delay

我有一个要求,在初始延迟 10 秒后,我需要每 10 分钟执行一次 SomeMethod,但是要注意,10 分钟计时器应该在 [=14= 完成后启动].这是粗略的例子:

Start Task 00:00:00
(10 second delay)
SomeMethod executed at 00:00:10 (takes 15 minutes)
(10 minute delay)
SomeMethod executed at 00:25:10 
... and so on.

我知道如何使用 TPL 执行此操作。我可以使用 Task.Delay 开始任务并执行 SomeMethod,然后在每次完成后 (ContinueWith TaskStatus.RanToCompletion),我创建一个新任务并再次执行 SomeMethod

我的问题是使用 Observable.Timer 是否可行?像...

Observable.Timer(TimeSpan.FromSeconds(10), TimeSpan.FromMinutes(10))

此代码的问题在于,如果 SomeMethod 需要 15 分钟,我将有两个不同的 SomeMethod 实例 运行ning,这是我不想要的。我希望 10 分钟计时器在 SomeMethod 完成后启动。这可能使用 Observable 还是我应该留在 TPL?

编辑: 忘了在它自己的线程中提到我想要 SomeMethod 到 运行。

假设 SomeMethod 在完成时发出 OnCompleted 事件,我们可以将其写为 Observable

//If SomeMethod OnCompleted conforms to .NET Event Pattern
var completedObservable = Observable.FromEventPattern<OnCompletedEventArgs>(
            e => this.OnCompleted += e,
            e => this.OnCompleted += e);

//Subscribe to OnCompleted events
var repeatDisposable = completedObservable.Subscribe(_ => 
                                Observable.Timer(TimeSpan.FromMinutes(10))
                                          .Subscribe(_ => SomeMethod()));

//Start condition
var starterDisposable = Observable.Timer(TimeSpan.FromSeconds(10))
                                  .Subscribe(_ => SomeMethod());

您应该对 Observable.Timer 的使用进行更多调查。开箱即用。

了解 Rx 的一件重要事情是,它保证您永远不会获得单个订阅的并发执行。虽然 Rx 可能启用各种多线程场景,但它始终会序列化订阅。

所以,以这个可观察订阅为例:

Observable
    .Timer(TimeSpan.FromSeconds(10.0), TimeSpan.FromSeconds(2.0))
    .Timestamp()
    .Subscribe(x =>
    {
        Thread.Sleep(5000);
        Console.WriteLine(x.ToString());
    });

我创建了一个 observable,它将等待 10 秒开始发射值,然后 尝试 每 2 秒发射一个值。

然后我添加了 .Timestamp() 以准确记录值的生成时间。

最后,我订阅了一个强制线程休眠 5 秒的观察器。

这是前 4 个值输出:

0@2015-08-31 10:44:34 +00:00
1@2015-08-31 10:44:39 +00:00
2@2015-08-31 10:44:44 +00:00
3@2015-08-31 10:44:49 +00:00

您会注意到值之间的间隔为 5 秒。这非常接近你想要的。 Rx 看到两秒过去了,立即执行下一个值。

但是还有另一个 Rx 运算符可以完全满足您的要求 - .Generate(...)。这是生成各种可观察流的非常强大的运算符。

您想这样使用它:

Observable
    .Generate(0, x => true, x => x + 1, x => x,
        x => x == 0 ? TimeSpan.FromSeconds(10.0) : TimeSpan.FromSeconds(2.0))
    .Timestamp()
    .Subscribe(x =>
    {
        Thread.Sleep(5000);
        Console.WriteLine(x.ToString());
    });

在这种情况下,它完全按照您想要的方式工作。这是前十个值:

0@2015-08-31 10:48:27 +00:00
1@2015-08-31 10:48:34 +00:00
2@2015-08-31 10:48:41 +00:00
3@2015-08-31 10:48:48 +00:00
4@2015-08-31 10:48:55 +00:00
5@2015-08-31 10:49:02 +00:00
6@2015-08-31 10:49:09 +00:00
7@2015-08-31 10:49:16 +00:00
8@2015-08-31 10:49:23 +00:00
9@2015-08-31 10:49:30 +00:00

它每 7 秒触发一次。来自生成运算符的 2 和来自观察者的 5。

你显然可以输入你需要的时间。