如何将实体推送到 Rx Observable 上?
How do I push an entity onto an Rx Observable?
我有一个 class 负责以频繁但不规则的间隔生成事件,其他 classes 必须使用和操作。我想使用 Reactive Extensions 来完成这项任务。
这在消费者方面非常简单;我让我的消费者 class 实施 IObserver<Payload>
,一切似乎都很好。问题出在生产者 class 身上。
直接实现 IObservable<Payload>
(即,根据文档,不推荐使用我自己的 IDisposable Subscribe(IObserver<Payload> )
实现。它建议与 Observable.Create()
函数集组合. 因为我的 class 会 运行 很长一段时间,我尝试用 var myObservable = Observable.Never()
创建一个 Observable,然后,当我有新的有效载荷可用时,调用 myObservable.Publish(payloadData)
。但是,当我这样做时,我似乎没有在我的消费者中实现 OnNext
。
我认为,作为变通方法,我可以在 class 中创建一个事件,然后使用 FromEvent
函数创建 Observable,但这似乎是一种过于复杂的方法(也就是说,Observables 'requires' 事件的新热点似乎很奇怪)。我在这里忽略了一种简单的方法吗? 'standard' 创建自己的 Observable 源的方法是什么?
Observable.Never()
没有 "send" 通知,您应该使用 Observable.Return(yourValue)
如果您需要带有具体示例的指南,我建议您阅读 Intro to Rx
除非我找到更好的方法,否则我现在决定使用 BlockingCollection
。
var _itemsToSend = new BlockingCollection<Payload>();
IObservable<MessageQueuePayload> _deliverer =
_itemsToSend.GetConsumingEnumerable().ToObservable(Scheduler.Default);
创建一个 new Subject<Payload>
并调用它的 OnNext
方法来发送事件。您可以 Subscribe
与您的观察员讨论该主题。
主题的使用经常引起争论。有关 Subjects(链接到入门)使用的全面讨论,请参阅 here - 但总而言之,此用例听起来有效(即它可能符合本地 + 热门标准)。
顺便说一句,订阅接受委托的重载可能会消除您提供 IObserver<T>
.
实现的需要
我有一个 class 负责以频繁但不规则的间隔生成事件,其他 classes 必须使用和操作。我想使用 Reactive Extensions 来完成这项任务。
这在消费者方面非常简单;我让我的消费者 class 实施 IObserver<Payload>
,一切似乎都很好。问题出在生产者 class 身上。
直接实现 IObservable<Payload>
(即,根据文档,不推荐使用我自己的 IDisposable Subscribe(IObserver<Payload> )
实现。它建议与 Observable.Create()
函数集组合. 因为我的 class 会 运行 很长一段时间,我尝试用 var myObservable = Observable.Never()
创建一个 Observable,然后,当我有新的有效载荷可用时,调用 myObservable.Publish(payloadData)
。但是,当我这样做时,我似乎没有在我的消费者中实现 OnNext
。
我认为,作为变通方法,我可以在 class 中创建一个事件,然后使用 FromEvent
函数创建 Observable,但这似乎是一种过于复杂的方法(也就是说,Observables 'requires' 事件的新热点似乎很奇怪)。我在这里忽略了一种简单的方法吗? 'standard' 创建自己的 Observable 源的方法是什么?
Observable.Never()
没有 "send" 通知,您应该使用 Observable.Return(yourValue)
如果您需要带有具体示例的指南,我建议您阅读 Intro to Rx
除非我找到更好的方法,否则我现在决定使用 BlockingCollection
。
var _itemsToSend = new BlockingCollection<Payload>();
IObservable<MessageQueuePayload> _deliverer =
_itemsToSend.GetConsumingEnumerable().ToObservable(Scheduler.Default);
创建一个 new Subject<Payload>
并调用它的 OnNext
方法来发送事件。您可以 Subscribe
与您的观察员讨论该主题。
主题的使用经常引起争论。有关 Subjects(链接到入门)使用的全面讨论,请参阅 here - 但总而言之,此用例听起来有效(即它可能符合本地 + 热门标准)。
顺便说一句,订阅接受委托的重载可能会消除您提供 IObserver<T>
.