在 F# 中等待 RX 主题

Awaiting an RX subject in F#

这与 How do I await a response from an RX Subject without introducing a race condition? 相同,但在 F# 中。

C# 解决方案如下所示:

static async void Foo()
{
    var subject = new Subject<int>();
    var firstInt = subject.FirstAsync().PublishLast();
    firstInt.Connect();
    subject.OnNext(42);

    var x = await firstInt;
    Console.WriteLine("Done waiting: " + x);
}

我在 F# 中的尝试是这样的:

let foo () =
    async {
        use subject = new Subject<int>()
        let firstInt = subject.FirstAsync().PublishLast()
        firstInt.Connect() |> ignore
        subject.OnNext(42)

        let! x = firstInt
        printfn "Done waiting: %d" x
        return ()
    }

let x! = firstInt 给出了编译错误 This expression was expected to have type Async<'a> but here has type IConnectableObservable<int> 所以显然 C# 做了一些 F# 没有做的事情。

这里是否有 C# 隐式接口强制转换,我需要在 F# 中显式执行?如果是这样,我不知道它是什么。

经过进一步挖掘,当您 await 某些东西时,C# 似乎会在幕后调用 GetAwaiter()。对于 SubjectIObservableGetAwaiter returns 和 AsyncSubject,这在 F# 中不是立即有用,但 ToTask 扩展方法在 System.Reactive.Threading.Tasks 中使它有用。显然,您可以将 ToTask 直接应用于 Subject(或 IObservable),而无需通过 GetAwaiter,因此我的问题通过更改 let! x ... 得到解决]声明:

    let! x = firstInt.ToTask() |> Async.AwaitTask

编辑:

有更好的方法

使用 FSharpx.Async 是完成同一件事的更好方法:

open FSharpx.Control.Observable

let foo () =
    async {
        use subject = new Subject<int>()
        subject.OnNext(42)

        let! x = Async.AwaitObservable subject
        printfn "Done waiting: %d" x
        return ()
    }