如何在 Rx 中使用异步方法订阅?

How to Subscribe with async method in Rx?

我有以下代码:

IObservable<Data> _source;

...

_source.Subscribe(StoreToDatabase);

private async Task StoreToDatabase(Data data) {
    await dbstuff(data);
}

但是,这不会编译。有什么方法可以异步观察数据吗?我试过async void,可以,但我觉得给定的方案不可行。

我也检查了 Reactive Extensions Subscribe calling await,但它没有回答我的问题(我不关心 SelectMany 结果。)

您不必关心 SelectMany 结果。答案仍然是一样的……尽管您需要您的任务具有 return 类型(即 Task<T>,而不是 Task)。

Unit 本质上等同于 void,因此您可以使用:

_source.SelectMany(StoreToDatabase).Subscribe();

private async Task<Unit> StoreToDatabase(Data data)
{
    await dbstuff(data);
    return Unit.Default;
}

SelectMany 重载接受 Func<TSource, Task<TResult> 意味着在任务完成之前生成的序列不会完成。

我一直在使用TPL DataFlow来控制背压,并用它来解决这个问题。

关键部分是ITargetBlock<TInput>.AsObserver() - source.

// Set a block to handle each element
ITargetBlock<long> targetBlock = new ActionBlock<long>(async p =>
{
    Console.WriteLine($"Received {p}");
    await Task.Delay(1000);
    Console.WriteLine($"Finished handling {p}");
},
new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });

// Generate an item each second for 10 seconds
var sequence = Observable.Interval(TimeSpan.FromSeconds(1)).Take(10);

// Subscribe with an observer created from the target block.
sequence.Subscribe(targetBlock.AsObserver());

// Await completion of the block
await targetBlock.Completion;

这里的重要部分是 ActionBlock 的边界容量设置为 1。这可以防止块一次接收多个项目,并且 will block OnNext 如果一个项目已经在处理中!

让我大吃一惊的是,在您的订阅中调用 Task.WaitTask.Result 是安全的。显然,如果您调用了 ObserverOnDispatcher() 或类似的方法,您可能会遇到死锁。小心!

迟到的答案,但我认为以下扩展方法正确封装了 Charles Mager 在 中提出的内容:

public static IDisposable SubscribeAsync<T>(this IObservable<T> source, 
                         Func<Task> asyncAction, Action<Exception> handler = null)
{
    Func<T,Task<Unit>> wrapped = async t =>
    {
        await asyncAction();
        return Unit.Default;
    };
    if(handler == null)
        return source.SelectMany(wrapped).Subscribe(_ => { });
    else
        return source.SelectMany(wrapped).Subscribe(_ => { }, handler);
}

public static IDisposable SubscribeAsync<T>(this IObservable<T> source, 
                         Func<T,Task> asyncAction, Action<Exception> handler = null)
{
    Func<T, Task<Unit>> wrapped = async t =>
    {
        await asyncAction(t);
        return Unit.Default;
    };
    if(handler == null)
        return source.SelectMany(wrapped).Subscribe(_ => { });
    else
        return source.SelectMany(wrapped).Subscribe(_ => { }, handler);
}

所以您想要 运行 存储数据过程,可能是一些其他过程并异步等待完成或部分结果。此处显示的创建构造函数如何:

IObservable<Int32> NotifyOfStoringProgress = 
Observable.Create(
(Func<IObserver<Int32>, Task>)
(async (ObserverToFeed) =>
{
    ObserverToFeed.OnNext(-1);    
    Task StoreToDataBase = Task.Run(()=> {;});
    ObserverToFeed.OnNext(0);    
    ;;
    await StoreToDataBase;
    ObserverToFeed.OnNext(1);
    ;;
}));

NotifyOfStoringProgress.Subscribe(onNext: Notification => {;});