使用异步函数订阅可观察序列
Subscribing to observable sequence with async function
我有一个 asnyc
函数,我想对 IObservable
序列中的每个观察调用该函数,限制一次传递一个事件。消费者期望在传输过程中不超过一条消息;如果我理解正确的话,这也是 RX 合同。
考虑这个示例:
static void Main() {
var ob = Observable.Interval(TimeSpan.FromMilliseconds(100));
//var d = ob.Subscribe(async x => await Consume(x)); // Does not rate-limit.
var d = ob.Subscribe(x => Consume(x).Wait());
Thread.Sleep(10000);
d.Dispose();
}
static async Task<Unit> Consume(long count) {
Console.WriteLine($"Consuming {count} on thread {Thread.CurrentThread.ManagedThreadId}");
await Task.Delay(750);
Console.WriteLine($"Returning on thread {Thread.CurrentThread.ManagedThreadId}");
return Unit.Default;
}
Consume
函数伪造了 750 毫秒的处理时间,ob
每 100 毫秒产生一次事件。上面的代码有效,但在随机线程上调用 task.Wait()
。如果我改为在注释掉的第 3 行中订阅,那么 Consume
的调用速率与 ob
产生事件的速率相同(而且我什至无法理解我正在使用的 Subscribe
的重载在这个评论的声明中,所以这可能是胡说八道。
那么我如何正确地将一个事件从可观察序列传递到 async
函数?
订阅者不应该是长 运行,因此不支持在订阅处理程序中执行长 运行 异步方法。
相反,将您的异步方法视为从另一个序列获取值的单值可观察序列。
现在你可以组合序列了,这就是 Rx 的设计目标。
现在您已经实现了这一飞跃,您可能会得到类似于@Reijher 在 Howto call back async function from rx subscribe? 中创建的内容。
他的代码分解如下
//The input sequence. Produces values potentially quicker than consumer
Observable.Interval(TimeSpan.FromSeconds(1))
//Project the event you receive, into the result of the async method
.Select(l => Observable.FromAsync(() => asyncMethod(l)))
//Ensure that the results are serialized
.Concat()
//do what you will here with the results of the async method calls
.Subscribe();
在这种情况下,您正在创建隐式队列。
在生产者比消费者快的任何问题中,都需要使用队列在等待时收集值。
我个人更喜欢通过将数据放入队列来明确这一点。
或者,您可以显式使用调度程序来发出信号,表明线程模型应该弥补松弛。
对于 Rx 新手来说,这似乎是一个流行的障碍(在订阅处理程序中执行异步)。
指南不将它们放入您的订阅者中的原因有很多,例如:
1. 你破坏了错误模型
2. 你正在混合异步模型(这里是 rx,那里是 task)
3. subscribe 是异步序列组合的消费者。异步方法只是一个单值序列,因此该视图不能作为序列的结尾,但结果可能是。
更新
为了说明关于打破错误模型的评论,这里更新了 OP 示例。
void Main()
{
var ob = Observable.Interval(TimeSpan.FromMilliseconds(100));
var d = ob.Subscribe(
x => ConsumeThrows(x).Wait(),
ex=> Console.WriteLine("I will not get hit"));
Thread.Sleep(10000);
d.Dispose();
}
static async Task<Unit> ConsumeThrows(long count)
{
return await Task.FromException<Unit>(new Exception("some failure"));
//this will have the same effect of bringing down the application.
//throw new Exception("some failure");
}
在这里我们可以看到,如果 OnNext
处理程序抛出异常,那么我们就不受 Rx OnError
处理程序的保护。
异常将不会得到处理,很可能会导致应用程序崩溃。
我有一个 asnyc
函数,我想对 IObservable
序列中的每个观察调用该函数,限制一次传递一个事件。消费者期望在传输过程中不超过一条消息;如果我理解正确的话,这也是 RX 合同。
考虑这个示例:
static void Main() {
var ob = Observable.Interval(TimeSpan.FromMilliseconds(100));
//var d = ob.Subscribe(async x => await Consume(x)); // Does not rate-limit.
var d = ob.Subscribe(x => Consume(x).Wait());
Thread.Sleep(10000);
d.Dispose();
}
static async Task<Unit> Consume(long count) {
Console.WriteLine($"Consuming {count} on thread {Thread.CurrentThread.ManagedThreadId}");
await Task.Delay(750);
Console.WriteLine($"Returning on thread {Thread.CurrentThread.ManagedThreadId}");
return Unit.Default;
}
Consume
函数伪造了 750 毫秒的处理时间,ob
每 100 毫秒产生一次事件。上面的代码有效,但在随机线程上调用 task.Wait()
。如果我改为在注释掉的第 3 行中订阅,那么 Consume
的调用速率与 ob
产生事件的速率相同(而且我什至无法理解我正在使用的 Subscribe
的重载在这个评论的声明中,所以这可能是胡说八道。
那么我如何正确地将一个事件从可观察序列传递到 async
函数?
订阅者不应该是长 运行,因此不支持在订阅处理程序中执行长 运行 异步方法。
相反,将您的异步方法视为从另一个序列获取值的单值可观察序列。 现在你可以组合序列了,这就是 Rx 的设计目标。
现在您已经实现了这一飞跃,您可能会得到类似于@Reijher 在 Howto call back async function from rx subscribe? 中创建的内容。
他的代码分解如下
//The input sequence. Produces values potentially quicker than consumer
Observable.Interval(TimeSpan.FromSeconds(1))
//Project the event you receive, into the result of the async method
.Select(l => Observable.FromAsync(() => asyncMethod(l)))
//Ensure that the results are serialized
.Concat()
//do what you will here with the results of the async method calls
.Subscribe();
在这种情况下,您正在创建隐式队列。 在生产者比消费者快的任何问题中,都需要使用队列在等待时收集值。 我个人更喜欢通过将数据放入队列来明确这一点。 或者,您可以显式使用调度程序来发出信号,表明线程模型应该弥补松弛。
对于 Rx 新手来说,这似乎是一个流行的障碍(在订阅处理程序中执行异步)。 指南不将它们放入您的订阅者中的原因有很多,例如: 1. 你破坏了错误模型 2. 你正在混合异步模型(这里是 rx,那里是 task) 3. subscribe 是异步序列组合的消费者。异步方法只是一个单值序列,因此该视图不能作为序列的结尾,但结果可能是。
更新
为了说明关于打破错误模型的评论,这里更新了 OP 示例。
void Main()
{
var ob = Observable.Interval(TimeSpan.FromMilliseconds(100));
var d = ob.Subscribe(
x => ConsumeThrows(x).Wait(),
ex=> Console.WriteLine("I will not get hit"));
Thread.Sleep(10000);
d.Dispose();
}
static async Task<Unit> ConsumeThrows(long count)
{
return await Task.FromException<Unit>(new Exception("some failure"));
//this will have the same effect of bringing down the application.
//throw new Exception("some failure");
}
在这里我们可以看到,如果 OnNext
处理程序抛出异常,那么我们就不受 Rx OnError
处理程序的保护。
异常将不会得到处理,很可能会导致应用程序崩溃。