使用异步函数订阅可观察序列

Subscribing to observable sequence with async function

我有一个 asnyc 函数,我想对 IObservable 序列中的每个观察调用该函数,限制一次传递一个事件。消费者期望在传输过程中不超过一条消息;如果我理解正确的话,这也是 RX 合同。

考虑这个示例:

static void Main() {
  var ob = Observable.Interval(TimeSpan.FromMilliseconds(100));
  //var d = ob.Subscribe(async x => await Consume(x));  // Does not rate-limit.
  var d = ob.Subscribe(x => Consume(x).Wait());
  Thread.Sleep(10000);
  d.Dispose();
}

static async Task<Unit> Consume(long count) {
  Console.WriteLine($"Consuming {count} on thread {Thread.CurrentThread.ManagedThreadId}");
  await Task.Delay(750);
  Console.WriteLine($"Returning on thread {Thread.CurrentThread.ManagedThreadId}");
  return Unit.Default;
}

Consume 函数伪造了 750 毫秒的处理时间,ob 每 100 毫秒产生一次事件。上面的代码有效,但在随机线程上调用 task.Wait()。如果我改为在注释掉的第 3 行中订阅,那么 Consume 的调用速率与 ob 产生事件的速率相同(而且我什至无法理解我正在使用的 Subscribe 的重载在这个评论的声明中,所以这可能是胡说八道。

那么我如何正确地将一个事件从可观察序列传递到 async 函数?

订阅者不应该是长 运行,因此不支持在订阅处理程序中执行长 运行 异步方法。

相反,将您的异步方法视为从另一个序列获取值的单值可观察序列。 现在你可以组合序列了,这就是 Rx 的设计目标。

现在您已经实现了这一飞跃,您可能会得到类似于@Reijher 在 Howto call back async function from rx subscribe? 中创建的内容。

他的代码分解如下

//The input sequence. Produces values potentially quicker than consumer
Observable.Interval(TimeSpan.FromSeconds(1))
      //Project the event you receive, into the result of the async method
      .Select(l => Observable.FromAsync(() => asyncMethod(l)))
      //Ensure that the results are serialized
      .Concat()
      //do what you will here with the results of the async method calls
      .Subscribe();

在这种情况下,您正在创建隐式队列。 在生产者比消费者快的任何问题中,都需要使用队列在等待时收集值。 我个人更喜欢通过将数据放入队列来明确这一点。 或者,您可以显式使用调度程序来发出信号,表明线程模型应该弥补松弛。

对于 Rx 新手来说,这似乎是一个流行的障碍(在订阅处理程序中执行异步)。 指南不将它们放入您的订阅者中的原因有很多,例如: 1. 你破坏了错误模型 2. 你正在混合异步模型(这里是 rx,那里是 task) 3. subscribe 是异步序列组合的消费者。异步方法只是一个单值序列,因此该视图不能作为序列的结尾,但结果可能是。

更新

为了说明关于打破错误模型的评论,这里更新了 OP 示例。

void Main()
{
    var ob = Observable.Interval(TimeSpan.FromMilliseconds(100));
    var d = ob.Subscribe(
        x => ConsumeThrows(x).Wait(),
        ex=> Console.WriteLine("I will not get hit"));

    Thread.Sleep(10000);
    d.Dispose();
}

static async Task<Unit> ConsumeThrows(long count)
{
    return await Task.FromException<Unit>(new Exception("some failure"));
    //this will have the same effect of bringing down the application.
    //throw new Exception("some failure");
}

在这里我们可以看到,如果 OnNext 处理程序抛出异常,那么我们就不受 Rx OnError 处理程序的保护。 异常将不会得到处理,很可能会导致应用程序崩溃。