来自 StackExchange Redis Pub Sub 订阅的可观察流
Observable stream from StackExchange Redis Pub Sub subscription
OBJECTIVE:
我正在使用 StackExchange Redis 客户端。我的 objective 是从客户端公开的 Pub Sub 订阅者创建一个 Observable 流,然后可以反过来支持 Observables 的 1-n 订阅,每个订阅者都有自己的通过 LINQ 过滤。 (发布正在按计划进行,问题完全与订阅特定频道上的事件流有关。)
背景:
我正在使用 Redis Pub Sub 作为事件源 CQRS 应用程序的一部分。具体用例是向多个订阅者发布事件,然后更新各种阅读模型、发送电子邮件等。
这些订阅者中的每一个都需要过滤他们处理的事件类型,为此我希望使用 Rx .Net(反应性扩展)和 LINQ 来
在事件流上提供过滤条件,以有效地处理仅对感兴趣的事件做出反应。使用这种方法消除了使用事件总线实现注册处理程序的需要,并允许我通过部署每个具有 1 的 1-n 微服务来向系统添加新的预测-n Observables 使用它们自己特定的过滤器订阅事件流。
我尝试了什么:
1) 我创建了一个 class 继承自 ObservableBase,覆盖了 SubscribeCore 方法,该方法接收来自 Observables 的订阅请求,将它们存储在 ConcurrentDictionary 中,并且当每个 Redis 通知从通道到达时,循环遍历注册的 Observable 订阅者并调用他们的 OnNext 方法传递 RedisValue。
2) 我创建了一个 Subject,它也接受来自 Observables 的订阅,并调用它们的 OnNext 方法。同样,许多人似乎不赞成使用 Subjects。
问题:
我尝试过的方法确实有效(至少表面上如此),具有不同的性能水平,但是 feel like a hack
,并且我没有按照预期的方式使用 Rx。
我看到很多评论说应该尽可能使用内置的 Observable 方法,例如 Observable.FromEvent,但这似乎无法通过 StackExchange Redis 客户端订阅来实现 API,至少在我看来是这样。
我也明白接收流并转发到 multiple 观察者的首选方法是使用 ConnectableObservable,这看起来专为我面临的场景而设计(每个微服务将在内部订阅 1-n Observables)。目前,我不知道如何将 ConnectableObservable 连接到来自 StackExchange Redis 的通知,或者它是否比 Observable[=53= 提供真正的好处].
更新:
尽管在我的场景中完成不是问题(处置很好),但错误处理很重要;例如隔离在一个订阅者中检测到的错误,以防止所有订阅终止。
这是一种扩展方法,您可以使用它从 ISubscriber
和 RedisChannel
创建 IObservable<RedisValue>
:
public static IObservable<RedisValue> WhenMessageReceived(this ISubscriber subscriber, RedisChannel channel)
{
return Observable.Create<RedisValue>(async (obs, ct) =>
{
// as the SubscribeAsync callback can be invoked concurrently
// a thread-safe wrapper for OnNext is needed
var syncObs = Observer.Synchronize(obs);
await subscriber.SubscribeAsync(channel, (_, message) =>
{
syncObs.OnNext(message);
}).ConfigureAwait(false);
return Disposable.Create(() => subscriber.Unsubscribe(channel));
});
}
由于 Redis 频道没有完成,结果 IObservable
将永远不会完成,但是您可以放弃 IDisposable
订阅以取消订阅 Redis 频道(这将由许多 Rx 自动完成运算符)。
用法可能是这样的:
var subscriber = connectionMultiplexer.GetSubscriber();
var gotMessage = await subscriber.WhenMessageReceived("my_channel")
.AnyAsync(msg => msg == "expected_message")
.ToTask()
.ConfigureAwait(false);
或者按照你的例子:
var subscriber = connectionMultiplexer.GetSubscriber();
var sendEmailEvents = subscriber.WhenMessageReceived("my_channel")
.Select(msg => ParseEventFromMessage(msg))
.Where(evt => evt.Type == EventType.SendEmails);
await sendEmailEvents.ForEachAsync(evt =>
{
SendEmails(evt);
}).ConfigureAwait(false);
其他微服务的过滤方式可能不同。
OBJECTIVE:
我正在使用 StackExchange Redis 客户端。我的 objective 是从客户端公开的 Pub Sub 订阅者创建一个 Observable 流,然后可以反过来支持 Observables 的 1-n 订阅,每个订阅者都有自己的通过 LINQ 过滤。 (发布正在按计划进行,问题完全与订阅特定频道上的事件流有关。)
背景:
我正在使用 Redis Pub Sub 作为事件源 CQRS 应用程序的一部分。具体用例是向多个订阅者发布事件,然后更新各种阅读模型、发送电子邮件等。
这些订阅者中的每一个都需要过滤他们处理的事件类型,为此我希望使用 Rx .Net(反应性扩展)和 LINQ 来 在事件流上提供过滤条件,以有效地处理仅对感兴趣的事件做出反应。使用这种方法消除了使用事件总线实现注册处理程序的需要,并允许我通过部署每个具有 1 的 1-n 微服务来向系统添加新的预测-n Observables 使用它们自己特定的过滤器订阅事件流。
我尝试了什么:
1) 我创建了一个 class 继承自 ObservableBase,覆盖了 SubscribeCore 方法,该方法接收来自 Observables 的订阅请求,将它们存储在 ConcurrentDictionary 中,并且当每个 Redis 通知从通道到达时,循环遍历注册的 Observable 订阅者并调用他们的 OnNext 方法传递 RedisValue。
2) 我创建了一个 Subject,它也接受来自 Observables 的订阅,并调用它们的 OnNext 方法。同样,许多人似乎不赞成使用 Subjects。
问题:
我尝试过的方法确实有效(至少表面上如此),具有不同的性能水平,但是 feel like a hack
,并且我没有按照预期的方式使用 Rx。
我看到很多评论说应该尽可能使用内置的 Observable 方法,例如 Observable.FromEvent,但这似乎无法通过 StackExchange Redis 客户端订阅来实现 API,至少在我看来是这样。
我也明白接收流并转发到 multiple 观察者的首选方法是使用 ConnectableObservable,这看起来专为我面临的场景而设计(每个微服务将在内部订阅 1-n Observables)。目前,我不知道如何将 ConnectableObservable 连接到来自 StackExchange Redis 的通知,或者它是否比 Observable[=53= 提供真正的好处].
更新:
尽管在我的场景中完成不是问题(处置很好),但错误处理很重要;例如隔离在一个订阅者中检测到的错误,以防止所有订阅终止。
这是一种扩展方法,您可以使用它从 ISubscriber
和 RedisChannel
创建 IObservable<RedisValue>
:
public static IObservable<RedisValue> WhenMessageReceived(this ISubscriber subscriber, RedisChannel channel)
{
return Observable.Create<RedisValue>(async (obs, ct) =>
{
// as the SubscribeAsync callback can be invoked concurrently
// a thread-safe wrapper for OnNext is needed
var syncObs = Observer.Synchronize(obs);
await subscriber.SubscribeAsync(channel, (_, message) =>
{
syncObs.OnNext(message);
}).ConfigureAwait(false);
return Disposable.Create(() => subscriber.Unsubscribe(channel));
});
}
由于 Redis 频道没有完成,结果 IObservable
将永远不会完成,但是您可以放弃 IDisposable
订阅以取消订阅 Redis 频道(这将由许多 Rx 自动完成运算符)。
用法可能是这样的:
var subscriber = connectionMultiplexer.GetSubscriber();
var gotMessage = await subscriber.WhenMessageReceived("my_channel")
.AnyAsync(msg => msg == "expected_message")
.ToTask()
.ConfigureAwait(false);
或者按照你的例子:
var subscriber = connectionMultiplexer.GetSubscriber();
var sendEmailEvents = subscriber.WhenMessageReceived("my_channel")
.Select(msg => ParseEventFromMessage(msg))
.Where(evt => evt.Type == EventType.SendEmails);
await sendEmailEvents.ForEachAsync(evt =>
{
SendEmails(evt);
}).ConfigureAwait(false);
其他微服务的过滤方式可能不同。