当所有订阅者在 RxJava2 上收到项目时,如何触发事件?
How to trigger an event when all subscribers received the item on RxJava2?
我在 RxJava 上开发了一个 consumer/producer,基于下面的文章 rxjava2 producer-consumer example。
但是我的用例有点复杂。
我将使用多个订阅者,每个订阅者都有某种过滤器(订阅者的数量及其过滤器会在运行时发生变化),并且会有一些不会被任何订阅者处理的事件.
我需要获取所有剩余项目(不符合任何订阅者的条件)来进行一些 post 处理。
我尝试在生产者上使用方法 "onAfterNext()",但是该方法在订阅者处理项目之前执行
有办法吗?
使用 PublishSubject
class 添加我的听众解决了这个问题。
PublishSubject<SQSMessage<String>> ps = PublishSubject.create()
ps.publish();
ps.subscribe(m -> System.err.println(m));
flowable.parallel()
.runOn(Schedulers.io())
.map(item -> {
// This map sends the object to my listeners on PublishSubject
ps.onNext(item);
return item;
})
.sequential()
// This map does the post processing after
// the object was sent to the listeners
.map(s -> {//do post process stuff})
.subscribe();
我在 RxJava 上开发了一个 consumer/producer,基于下面的文章 rxjava2 producer-consumer example。
但是我的用例有点复杂。
我将使用多个订阅者,每个订阅者都有某种过滤器(订阅者的数量及其过滤器会在运行时发生变化),并且会有一些不会被任何订阅者处理的事件.
我需要获取所有剩余项目(不符合任何订阅者的条件)来进行一些 post 处理。
我尝试在生产者上使用方法 "onAfterNext()",但是该方法在订阅者处理项目之前执行
有办法吗?
使用 PublishSubject
class 添加我的听众解决了这个问题。
PublishSubject<SQSMessage<String>> ps = PublishSubject.create()
ps.publish();
ps.subscribe(m -> System.err.println(m));
flowable.parallel()
.runOn(Schedulers.io())
.map(item -> {
// This map sends the object to my listeners on PublishSubject
ps.onNext(item);
return item;
})
.sequential()
// This map does the post processing after
// the object was sent to the listeners
.map(s -> {//do post process stuff})
.subscribe();