ConfigureAwait 用于 IObservable<T>

ConfigureAwait for IObservable<T>

当我将阻塞代码与 Task.Wait() 一起使用时,我遇到了死锁,等待内部等待 Rx LINQ 查询的 async 方法。

这是一个例子:

public void BlockingCode() 

{

    this.ExecuteAsync().Wait();

}

public async Task ExecuteAsync() 

{

    await this.service.GetFooAsync().ConfigureAwait(false);

    //This is the RX query that doesn't support ConfigureAwaitawait 
    await this.service.Receiver
      .FirstOrDefaultAsync(x => x == "foo")
      .Timeout(TimeSpan.FromSeconds(1));

}

所以,我的问题是,在可等待的 IObservable 上是否有任何等效的 ConfigureAwait 来确保不会在相同的 SynchronizationContext.

上恢复继续

ConfigureAwait 与等待者本身没有直接关系,而是 TPL 的一项功能,用于配置 Task 应如何完成。这是有问题的,因为这个 TPL 方法没有 return 一个新的 Task,所以你不能将它与一个可观察的转换组合起来。

Rx 本身基本上是自由线程的。您可以通过比 Tasks 更精细的控制来控制订阅和事件期间使用的线程 - 有关更多信息,请参见此处:ObserveOn and SubscribeOn - where the work is being done

很难修复你的代码,因为你没有提供一个小而完整的工作示例 - 然而,Rx 中的内置函数永远不会尝试编组到一个特定的线程,除非你特别告诉他们使用以上运算符之一。

如果你组合像 Observable.FromAsync 这样的运算符来将 Task 转换为可观察的,你可以使用 Observable.SubscribeOn(Scheduler.Default) 从当前 [=18] 开始 Task =].

这里有一个要点(为 LINQPad 设计,使用 nuget 包 rx-main 运行):https://gist.github.com/james-world/82c3cc39babab7870f6d

你必须理解"awaiting an Observable"是什么意思。查看 this。所以,从语义上讲,你的代码

await this.service.Receiver
    .FirstOrDefaultAsync(x => x == "foo")
    .Timeout(TimeSpan.FromSeconds(1));

等同于

await this.service.Receiver
    .FirstOrDefaultAsync(x => x == "foo")
    .Timeout(TimeSpan.FromSeconds(1))
    .LastAsync()
    .ToTask();

(注意这里有些冗余,调用FirstOrDefaultAsyncLastAsync不过没问题)。

至此,您完成了任务(如果可用,可能需要额外的 CancellationToken)。您现在可以使用 ConfigureAwait