如何使用 async/await OnNext/OnError/OnCompleted 方法实现 IObserver?

How to implement an IObserver with async/await OnNext/OnError/OnCompleted methods?

我正在尝试为 System.Net.WebSocket 编写一个扩展方法,使用 Reactive Extensions (Rx.NET) 将它变成一个 IObserver。您可以看到下面的代码:

public static IObserver<T> ToObserver<T>(this WebSocket webSocket, IWebSocketMessageSerializer<T> webSocketMessageSerializer)
{
    // Wrap the web socket in an interface that's a little easier to manage
    var webSocketMessageStream = new WebSocketMessageStream(webSocket);

    // Create the output stream to the client
    return Observer.Create<T>(
        onNext:      async message => await webSocketMessageStream.WriteMessageAsync(webSocketMessageSerializer.SerializeMessage(message)),
        onError:     async error   => await webSocketMessageStream.CloseAsync(WebSocketCloseStatus.InternalServerError, string.Format("{0}: {1}", error.GetType(), error.Message)),
        onCompleted: async ()      => await webSocketMessageStream.CloseAsync(WebSocketCloseStatus.NormalClosure, "Server disconnected")
    );
}

此代码 有效 ,但我担心在 onNext、onError 和 onCompleted lambda 内部使用 async/await。我知道这个 returns 一个 async void lambda,这是不受欢迎的(有时会导致我已经 运行 遇到的问题)。

我一直在阅读 Rx.NET 文档以及互联网上的博客文章,但我似乎找不到使用 async/await 的正确方法(如果有的话) IObserver 中的方法。有没有正确的方法来做到这一点?如果没有,那么我该如何解决这个问题?

订阅不采用async方法。所以这里发生的是您正在使用 async void 中的即发即弃机制。问题是 onNext 消息将不再被序列化。

与其在 Subscribe 中调用 async 方法,不如将其包装到管道中以允许 Rx 等待它。

可以在 onErroronCompleted 上使用即发即弃,因为它们保证是从 Observable 调用的最后一个东西。请记住,与 Observable 关联的资源可以在 onErroronCompleted 返回后,在它们完成之前清理。

我写了一个小的扩展方法来完成这一切:

    public static IDisposable SubscribeAsync<T>(this IObservable<T> source, Func<T, Task> onNext, Action<Exception> onError, Action onCompleted)
    {
        return source.Select(e => Observable.Defer(() => onNext(e).ToObservable())).Concat()
            .Subscribe(
            e => { }, // empty
            onError,
            onCompleted);
    }

首先我们将 async onNext 方法转换为 Observable 方法,桥接两个异步世界。这意味着 onNext 内的 async/await 将受到尊重。接下来我们将方法包装到 Defer 中,这样它就不会在创建时立即启动。相反 Concat 只会在前一个完成时调用下一个延迟的可观察对象。

Ps。我希望 Rx.Net 的下一个版本将支持 async 订阅。