如何在 Rx 中使用异步方法订阅?
How to Subscribe with async method in Rx?
我有以下代码:
IObservable<Data> _source;
...
_source.Subscribe(StoreToDatabase);
private async Task StoreToDatabase(Data data) {
await dbstuff(data);
}
但是,这不会编译。有什么方法可以异步观察数据吗?我试过async void
,可以,但我觉得给定的方案不可行。
我也检查了 Reactive Extensions Subscribe calling await,但它没有回答我的问题(我不关心 SelectMany
结果。)
您不必关心 SelectMany
结果。答案仍然是一样的……尽管您需要您的任务具有 return 类型(即 Task<T>
,而不是 Task
)。
Unit
本质上等同于 void
,因此您可以使用:
_source.SelectMany(StoreToDatabase).Subscribe();
private async Task<Unit> StoreToDatabase(Data data)
{
await dbstuff(data);
return Unit.Default;
}
此 SelectMany
重载接受 Func<TSource, Task<TResult>
意味着在任务完成之前生成的序列不会完成。
我一直在使用TPL DataFlow来控制背压,并用它来解决这个问题。
关键部分是ITargetBlock<TInput>.AsObserver()
- source.
// Set a block to handle each element
ITargetBlock<long> targetBlock = new ActionBlock<long>(async p =>
{
Console.WriteLine($"Received {p}");
await Task.Delay(1000);
Console.WriteLine($"Finished handling {p}");
},
new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
// Generate an item each second for 10 seconds
var sequence = Observable.Interval(TimeSpan.FromSeconds(1)).Take(10);
// Subscribe with an observer created from the target block.
sequence.Subscribe(targetBlock.AsObserver());
// Await completion of the block
await targetBlock.Completion;
这里的重要部分是 ActionBlock 的边界容量设置为 1。这可以防止块一次接收多个项目,并且 will block OnNext 如果一个项目已经在处理中!
让我大吃一惊的是,在您的订阅中调用 Task.Wait
和 Task.Result
是安全的。显然,如果您调用了 ObserverOnDispatcher()
或类似的方法,您可能会遇到死锁。小心!
迟到的答案,但我认为以下扩展方法正确封装了 Charles Mager 在 中提出的内容:
public static IDisposable SubscribeAsync<T>(this IObservable<T> source,
Func<Task> asyncAction, Action<Exception> handler = null)
{
Func<T,Task<Unit>> wrapped = async t =>
{
await asyncAction();
return Unit.Default;
};
if(handler == null)
return source.SelectMany(wrapped).Subscribe(_ => { });
else
return source.SelectMany(wrapped).Subscribe(_ => { }, handler);
}
public static IDisposable SubscribeAsync<T>(this IObservable<T> source,
Func<T,Task> asyncAction, Action<Exception> handler = null)
{
Func<T, Task<Unit>> wrapped = async t =>
{
await asyncAction(t);
return Unit.Default;
};
if(handler == null)
return source.SelectMany(wrapped).Subscribe(_ => { });
else
return source.SelectMany(wrapped).Subscribe(_ => { }, handler);
}
所以您想要 运行 存储数据过程,可能是一些其他过程并异步等待完成或部分结果。此处显示的创建构造函数如何:
IObservable<Int32> NotifyOfStoringProgress =
Observable.Create(
(Func<IObserver<Int32>, Task>)
(async (ObserverToFeed) =>
{
ObserverToFeed.OnNext(-1);
Task StoreToDataBase = Task.Run(()=> {;});
ObserverToFeed.OnNext(0);
;;
await StoreToDataBase;
ObserverToFeed.OnNext(1);
;;
}));
NotifyOfStoringProgress.Subscribe(onNext: Notification => {;});
我有以下代码:
IObservable<Data> _source;
...
_source.Subscribe(StoreToDatabase);
private async Task StoreToDatabase(Data data) {
await dbstuff(data);
}
但是,这不会编译。有什么方法可以异步观察数据吗?我试过async void
,可以,但我觉得给定的方案不可行。
我也检查了 Reactive Extensions Subscribe calling await,但它没有回答我的问题(我不关心 SelectMany
结果。)
您不必关心 SelectMany
结果。答案仍然是一样的……尽管您需要您的任务具有 return 类型(即 Task<T>
,而不是 Task
)。
Unit
本质上等同于 void
,因此您可以使用:
_source.SelectMany(StoreToDatabase).Subscribe();
private async Task<Unit> StoreToDatabase(Data data)
{
await dbstuff(data);
return Unit.Default;
}
此 SelectMany
重载接受 Func<TSource, Task<TResult>
意味着在任务完成之前生成的序列不会完成。
我一直在使用TPL DataFlow来控制背压,并用它来解决这个问题。
关键部分是ITargetBlock<TInput>.AsObserver()
- source.
// Set a block to handle each element
ITargetBlock<long> targetBlock = new ActionBlock<long>(async p =>
{
Console.WriteLine($"Received {p}");
await Task.Delay(1000);
Console.WriteLine($"Finished handling {p}");
},
new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
// Generate an item each second for 10 seconds
var sequence = Observable.Interval(TimeSpan.FromSeconds(1)).Take(10);
// Subscribe with an observer created from the target block.
sequence.Subscribe(targetBlock.AsObserver());
// Await completion of the block
await targetBlock.Completion;
这里的重要部分是 ActionBlock 的边界容量设置为 1。这可以防止块一次接收多个项目,并且 will block OnNext 如果一个项目已经在处理中!
让我大吃一惊的是,在您的订阅中调用 Task.Wait
和 Task.Result
是安全的。显然,如果您调用了 ObserverOnDispatcher()
或类似的方法,您可能会遇到死锁。小心!
迟到的答案,但我认为以下扩展方法正确封装了 Charles Mager 在
public static IDisposable SubscribeAsync<T>(this IObservable<T> source,
Func<Task> asyncAction, Action<Exception> handler = null)
{
Func<T,Task<Unit>> wrapped = async t =>
{
await asyncAction();
return Unit.Default;
};
if(handler == null)
return source.SelectMany(wrapped).Subscribe(_ => { });
else
return source.SelectMany(wrapped).Subscribe(_ => { }, handler);
}
public static IDisposable SubscribeAsync<T>(this IObservable<T> source,
Func<T,Task> asyncAction, Action<Exception> handler = null)
{
Func<T, Task<Unit>> wrapped = async t =>
{
await asyncAction(t);
return Unit.Default;
};
if(handler == null)
return source.SelectMany(wrapped).Subscribe(_ => { });
else
return source.SelectMany(wrapped).Subscribe(_ => { }, handler);
}
所以您想要 运行 存储数据过程,可能是一些其他过程并异步等待完成或部分结果。此处显示的创建构造函数如何:
IObservable<Int32> NotifyOfStoringProgress =
Observable.Create(
(Func<IObserver<Int32>, Task>)
(async (ObserverToFeed) =>
{
ObserverToFeed.OnNext(-1);
Task StoreToDataBase = Task.Run(()=> {;});
ObserverToFeed.OnNext(0);
;;
await StoreToDataBase;
ObserverToFeed.OnNext(1);
;;
}));
NotifyOfStoringProgress.Subscribe(onNext: Notification => {;});