如何等待可观察对象完成?

How to wait for an observable to complete?

在Node.js中,您可以设置服务器,只要服务器在事件循环中处于活动状态,进程就不会终止。我想知道是否可以使用响应式扩展来做类似的事情?我知道如何设置命名管道并使用它与另一个进程中的节点服务器通信,但我不确定如何防止程序通过等待按键以外的任何方式终止。我想将该服务器实现为反应管道的一部分。是否可以阻塞主线程直到管道完成?

open System
open System.Threading
open FSharp.Control.Reactive

module Observable = 
    /// Blocks the thread until the observable completes. 
    let waitUnit on_next (o : IObservable<_>) =
        use t = new ManualResetEventSlim()
        use __ = Observable.subscribeWithCompletion on_next (fun _ -> t.Set()) o
        t.Wait()

Observable.interval (TimeSpan.FromSeconds(1.0))
|> Observable.take 3
|> Observable.waitUnit (printfn "%i...")

printfn "Done."

以上使用的是F# Rx bindings,但C#版本会类似。库本身有 Observable.wait 类型 IObservable<'a> -> 'a,但它的缺点是在空序列上抛出异常并在内存中保留最新值,直到 observable 被处理。如果最终值不重要,这一个具有正确的语义。

我已经深入了解,wait 使用 ManualResetEventSlim 来阻止它所在的线程,所以这应该没问题。

抱歉,我无法在 F# 中给出答案,但在 C# 中这很简单。

如果我有这个例子:

Observable
    .Range(0, 10)
    .Subscribe(x => Console.WriteLine(x));

...我想等到它完成,我可以这样重写:

    Observable
        .Range(0, 10)
        .Do(x => Console.WriteLine(x))
        .ToArray()
        .Wait();

...或者:

    await Observable
        .Range(0, 10)
        .Do(x => Console.WriteLine(x))
        .ToArray();

正如 Marko 在评论中指出的那样,这种方法创建的数组可能非常大。为了解决这个问题,您可以将 .ToArray() 替换为 .LastAsync()(在一般使用 Async 之前命名,表示 Task - 在这种情况下它只是 returns observable 的最后一个元素作为 IObservable<T>).

.Wait() 确实会抛出一个空的可观察对象,但 await 版本不会。