如何将实体推送到 Rx Observable 上?

How do I push an entity onto an Rx Observable?

我有一个 class 负责以频繁但不规则的间隔生成事件,其他 classes 必须使用和操作。我想使用 Reactive Extensions 来完成这项任务。

这在消费者方面非常简单;我让我的消费者 class 实施 IObserver<Payload>,一切似乎都很好。问题出在生产者 class 身上。

直接实现 IObservable<Payload>(即,根据文档,不推荐使用我自己的 IDisposable Subscribe(IObserver<Payload> ) 实现。它建议与 Observable.Create() 函数集组合. 因为我的 class 会 运行 很长一段时间,我尝试用 var myObservable = Observable.Never() 创建一个 Observable,然后,当我有新的有效载荷可用时,调用 myObservable.Publish(payloadData)。但是,当我这样做时,我似乎没有在我的消费者中实现 OnNext

我认为,作为变通方法,我可以在 class 中创建一个事件,然后使用 FromEvent 函数创建 Observable,但这似乎是一种过于复杂的方法(也就是说,Observables 'requires' 事件的新热点似乎很奇怪)。我在这里忽略了一种简单的方法吗? 'standard' 创建自己的 Observable 源的方法是什么?

Observable.Never() 没有 "send" 通知,您应该使用 Observable.Return(yourValue)

如果您需要带有具体示例的指南,我建议您阅读 Intro to Rx

除非我找到更好的方法,否则我现在决定使用 BlockingCollection

var _itemsToSend = new BlockingCollection<Payload>();
IObservable<MessageQueuePayload> _deliverer =
_itemsToSend.GetConsumingEnumerable().ToObservable(Scheduler.Default);

创建一个 new Subject<Payload> 并调用它的 OnNext 方法来发送事件。您可以 Subscribe 与您的观察员讨论该主题。

主题的使用经常引起争论。有关 Subjects(链接到入门)使用的全面讨论,请参阅 here - 但总而言之,此用例听起来有效(即它可能符合本地 + 热门标准)。

顺便说一句,订阅接受委托的重载可能会消除您提供 IObserver<T>.

实现的需要