如何使用 async/await OnNext/OnError/OnCompleted 方法实现 IObserver?
How to implement an IObserver with async/await OnNext/OnError/OnCompleted methods?
我正在尝试为 System.Net.WebSocket 编写一个扩展方法,使用 Reactive Extensions (Rx.NET) 将它变成一个 IObserver。您可以看到下面的代码:
public static IObserver<T> ToObserver<T>(this WebSocket webSocket, IWebSocketMessageSerializer<T> webSocketMessageSerializer)
{
// Wrap the web socket in an interface that's a little easier to manage
var webSocketMessageStream = new WebSocketMessageStream(webSocket);
// Create the output stream to the client
return Observer.Create<T>(
onNext: async message => await webSocketMessageStream.WriteMessageAsync(webSocketMessageSerializer.SerializeMessage(message)),
onError: async error => await webSocketMessageStream.CloseAsync(WebSocketCloseStatus.InternalServerError, string.Format("{0}: {1}", error.GetType(), error.Message)),
onCompleted: async () => await webSocketMessageStream.CloseAsync(WebSocketCloseStatus.NormalClosure, "Server disconnected")
);
}
此代码 有效 ,但我担心在 onNext、onError 和 onCompleted lambda 内部使用 async/await。我知道这个 returns 一个 async void lambda,这是不受欢迎的(有时会导致我已经 运行 遇到的问题)。
我一直在阅读 Rx.NET 文档以及互联网上的博客文章,但我似乎找不到使用 async/await 的正确方法(如果有的话) IObserver 中的方法。有没有正确的方法来做到这一点?如果没有,那么我该如何解决这个问题?
订阅不采用async
方法。所以这里发生的是您正在使用 async void
中的即发即弃机制。问题是 onNext 消息将不再被序列化。
与其在 Subscribe 中调用 async
方法,不如将其包装到管道中以允许 Rx 等待它。
可以在 onError
和 onCompleted
上使用即发即弃,因为它们保证是从 Observable
调用的最后一个东西。请记住,与 Observable
关联的资源可以在 onError
和 onCompleted
返回后,在它们完成之前清理。
我写了一个小的扩展方法来完成这一切:
public static IDisposable SubscribeAsync<T>(this IObservable<T> source, Func<T, Task> onNext, Action<Exception> onError, Action onCompleted)
{
return source.Select(e => Observable.Defer(() => onNext(e).ToObservable())).Concat()
.Subscribe(
e => { }, // empty
onError,
onCompleted);
}
首先我们将 async onNext
方法转换为 Observable
方法,桥接两个异步世界。这意味着 onNext
内的 async/await 将受到尊重。接下来我们将方法包装到 Defer
中,这样它就不会在创建时立即启动。相反 Concat
只会在前一个完成时调用下一个延迟的可观察对象。
Ps。我希望 Rx.Net 的下一个版本将支持 async
订阅。
我正在尝试为 System.Net.WebSocket 编写一个扩展方法,使用 Reactive Extensions (Rx.NET) 将它变成一个 IObserver。您可以看到下面的代码:
public static IObserver<T> ToObserver<T>(this WebSocket webSocket, IWebSocketMessageSerializer<T> webSocketMessageSerializer)
{
// Wrap the web socket in an interface that's a little easier to manage
var webSocketMessageStream = new WebSocketMessageStream(webSocket);
// Create the output stream to the client
return Observer.Create<T>(
onNext: async message => await webSocketMessageStream.WriteMessageAsync(webSocketMessageSerializer.SerializeMessage(message)),
onError: async error => await webSocketMessageStream.CloseAsync(WebSocketCloseStatus.InternalServerError, string.Format("{0}: {1}", error.GetType(), error.Message)),
onCompleted: async () => await webSocketMessageStream.CloseAsync(WebSocketCloseStatus.NormalClosure, "Server disconnected")
);
}
此代码 有效 ,但我担心在 onNext、onError 和 onCompleted lambda 内部使用 async/await。我知道这个 returns 一个 async void lambda,这是不受欢迎的(有时会导致我已经 运行 遇到的问题)。
我一直在阅读 Rx.NET 文档以及互联网上的博客文章,但我似乎找不到使用 async/await 的正确方法(如果有的话) IObserver 中的方法。有没有正确的方法来做到这一点?如果没有,那么我该如何解决这个问题?
订阅不采用async
方法。所以这里发生的是您正在使用 async void
中的即发即弃机制。问题是 onNext 消息将不再被序列化。
与其在 Subscribe 中调用 async
方法,不如将其包装到管道中以允许 Rx 等待它。
可以在 onError
和 onCompleted
上使用即发即弃,因为它们保证是从 Observable
调用的最后一个东西。请记住,与 Observable
关联的资源可以在 onError
和 onCompleted
返回后,在它们完成之前清理。
我写了一个小的扩展方法来完成这一切:
public static IDisposable SubscribeAsync<T>(this IObservable<T> source, Func<T, Task> onNext, Action<Exception> onError, Action onCompleted)
{
return source.Select(e => Observable.Defer(() => onNext(e).ToObservable())).Concat()
.Subscribe(
e => { }, // empty
onError,
onCompleted);
}
首先我们将 async onNext
方法转换为 Observable
方法,桥接两个异步世界。这意味着 onNext
内的 async/await 将受到尊重。接下来我们将方法包装到 Defer
中,这样它就不会在创建时立即启动。相反 Concat
只会在前一个完成时调用下一个延迟的可观察对象。
Ps。我希望 Rx.Net 的下一个版本将支持 async
订阅。