ConfigureAwait 用于 IObservable<T>
ConfigureAwait for IObservable<T>
当我将阻塞代码与 Task.Wait()
一起使用时,我遇到了死锁,等待内部等待 Rx LINQ 查询的 async
方法。
这是一个例子:
public void BlockingCode()
{
this.ExecuteAsync().Wait();
}
public async Task ExecuteAsync()
{
await this.service.GetFooAsync().ConfigureAwait(false);
//This is the RX query that doesn't support ConfigureAwaitawait
await this.service.Receiver
.FirstOrDefaultAsync(x => x == "foo")
.Timeout(TimeSpan.FromSeconds(1));
}
所以,我的问题是,在可等待的 IObservable 上是否有任何等效的 ConfigureAwait 来确保不会在相同的 SynchronizationContext
.
上恢复继续
ConfigureAwait
与等待者本身没有直接关系,而是 TPL 的一项功能,用于配置 Task
应如何完成。这是有问题的,因为这个 TPL 方法没有 return 一个新的 Task
,所以你不能将它与一个可观察的转换组合起来。
Rx 本身基本上是自由线程的。您可以通过比 Tasks
更精细的控制来控制订阅和事件期间使用的线程 - 有关更多信息,请参见此处:ObserveOn and SubscribeOn - where the work is being done
很难修复你的代码,因为你没有提供一个小而完整的工作示例 - 然而,Rx 中的内置函数永远不会尝试编组到一个特定的线程,除非你特别告诉他们使用以上运算符之一。
如果你组合像 Observable.FromAsync
这样的运算符来将 Task
转换为可观察的,你可以使用 Observable.SubscribeOn(Scheduler.Default)
从当前 [=18] 开始 Task
=].
这里有一个要点(为 LINQPad 设计,使用 nuget 包 rx-main 运行):https://gist.github.com/james-world/82c3cc39babab7870f6d
你必须理解"awaiting an Observable"是什么意思。查看 this。所以,从语义上讲,你的代码
await this.service.Receiver
.FirstOrDefaultAsync(x => x == "foo")
.Timeout(TimeSpan.FromSeconds(1));
等同于
await this.service.Receiver
.FirstOrDefaultAsync(x => x == "foo")
.Timeout(TimeSpan.FromSeconds(1))
.LastAsync()
.ToTask();
(注意这里有些冗余,调用FirstOrDefaultAsync
和LastAsync
不过没问题)。
至此,您完成了任务(如果可用,可能需要额外的 CancellationToken
)。您现在可以使用 ConfigureAwait
。
当我将阻塞代码与 Task.Wait()
一起使用时,我遇到了死锁,等待内部等待 Rx LINQ 查询的 async
方法。
这是一个例子:
public void BlockingCode()
{
this.ExecuteAsync().Wait();
}
public async Task ExecuteAsync()
{
await this.service.GetFooAsync().ConfigureAwait(false);
//This is the RX query that doesn't support ConfigureAwaitawait
await this.service.Receiver
.FirstOrDefaultAsync(x => x == "foo")
.Timeout(TimeSpan.FromSeconds(1));
}
所以,我的问题是,在可等待的 IObservable 上是否有任何等效的 ConfigureAwait 来确保不会在相同的 SynchronizationContext
.
ConfigureAwait
与等待者本身没有直接关系,而是 TPL 的一项功能,用于配置 Task
应如何完成。这是有问题的,因为这个 TPL 方法没有 return 一个新的 Task
,所以你不能将它与一个可观察的转换组合起来。
Rx 本身基本上是自由线程的。您可以通过比 Tasks
更精细的控制来控制订阅和事件期间使用的线程 - 有关更多信息,请参见此处:ObserveOn and SubscribeOn - where the work is being done
很难修复你的代码,因为你没有提供一个小而完整的工作示例 - 然而,Rx 中的内置函数永远不会尝试编组到一个特定的线程,除非你特别告诉他们使用以上运算符之一。
如果你组合像 Observable.FromAsync
这样的运算符来将 Task
转换为可观察的,你可以使用 Observable.SubscribeOn(Scheduler.Default)
从当前 [=18] 开始 Task
=].
这里有一个要点(为 LINQPad 设计,使用 nuget 包 rx-main 运行):https://gist.github.com/james-world/82c3cc39babab7870f6d
你必须理解"awaiting an Observable"是什么意思。查看 this。所以,从语义上讲,你的代码
await this.service.Receiver
.FirstOrDefaultAsync(x => x == "foo")
.Timeout(TimeSpan.FromSeconds(1));
等同于
await this.service.Receiver
.FirstOrDefaultAsync(x => x == "foo")
.Timeout(TimeSpan.FromSeconds(1))
.LastAsync()
.ToTask();
(注意这里有些冗余,调用FirstOrDefaultAsync
和LastAsync
不过没问题)。
至此,您完成了任务(如果可用,可能需要额外的 CancellationToken
)。您现在可以使用 ConfigureAwait
。