Observable.FromAsync 对比 Task.ToObservable

Observable.FromAsync vs Task.ToObservable

有没有人知道何时使用其中一种方法而不是另一种方法。他们似乎做同样的事情,因为他们从 TPL Task 转换为 Observable

Observable.FromAsync 似乎支持取消令牌,这可能是允许生成任务的方法参与协作取消的细微差别,如果可观察对象被处置。

只是想知道我是否遗漏了一些明显的东西,为什么你会用一个而不是另一个。

谢谢

查看代码,Observable.FromAsync 似乎(至少在某些流程中)调用了 .ToObservable()*。我确信它们在语义上应该是等价的(假设您传递相同的参数,例如 Scheduler、CancellationToken 等)。

一个更适合 chaining/fluent 语法,一个单独阅读可能更好。无论您喜欢哪种编码风格。

*https://github.com/Reactive-Extensions/Rx.NET/blob/859e6159cb07be67fd36b18c2ae2b9a62979cb6d/Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Async.cs#L727

Observable.FromAsync 接受 Func<Task>Func<Task<TResult>> 形式的 TaskFactory, 在这种情况下,任务仅在订阅可观察对象时创建和执行。

其中 .ToObservable() 需要一个已经创建(并因此启动)的任务。

@Sickboy 回答正确。

  • Observable.FromAsync()会在订阅的那一刻开始任务。
  • Task.ToObservable() 需要一个已经 运行 的任务。

Observable.FromAsync 的一个用途是控制对异步方法的多次调用的重入。

这是这两种方法不等效的示例:

//ob is some IObservable<T>

//ExecuteQueryAsync is some async method
//Here, ExecuteQueryAsync will run **serially**, the second call will start
//only when the first one is already finished. This is an important property
//if ExecuteQueryAsync doesn't support reentrancy
ob
.Select(x => Observable.FromAsync(() => ExecuteQueryAsync(x))
.Concat()
.ObserveOnDispatcher()
.Subscribe(action)

对比

//ob is some IObservable<T>

//ExecuteQueryAsync is some async method
//Even when the `Subscribe` action order will be the same as the first 
//example because of the `Concat`, ExecuteQueryAsync calls could be     
//parallel, the second call to the method could start before the end of the 
//first call. 
.Select(x => ExecuteQueryAsync(x).ToObservable())
.Concat()
.Subscribe(action)

请注意,在第一个示例中,可能需要 ObserveOn()ObserveOnDispatcher() 方法来确保 action 在原始调度程序上执行,因为 Observable.FromAsync不等待任务,因此继续在任何可用的调度程序上执行

除了能够使用 CancellationToken 之外,FromAsync 包装在延迟中,因此这允许根据订阅时的条件更改任务逻辑。请注意,任务不会启动,内部 task.ToObservable 将被调用。 Func 确实允许您在创建任务时启动任务。