当所有订阅者在 RxJava2 上收到项目时,如何触发事件?

How to trigger an event when all subscribers received the item on RxJava2?

我在 RxJava 上开发了一个 consumer/producer,基于下面的文章 rxjava2 producer-consumer example

但是我的用例有点复杂。

我将使用多个订阅者,每个订阅者都有某种过滤器(订阅者的数量及其过滤器会在运行时发生变化),并且会有一些不会被任何订阅者处理的事件.

我需要获取所有剩余项目(不符合任何订阅者的条件)来进行一些 post 处理。

我尝试在生产者上使用方法 "onAfterNext()",但是该方法在订阅者处理项目之前执行

有办法吗?

使用 PublishSubject class 添加我的听众解决了这个问题。

PublishSubject<SQSMessage<String>> ps = PublishSubject.create()
ps.publish();
ps.subscribe(m -> System.err.println(m));

flowable.parallel()
    .runOn(Schedulers.io())
    .map(item -> {   
        // This map sends the object to my listeners on PublishSubject
        ps.onNext(item);
       return item;
    })
    .sequential()
    // This map does the post processing after
    // the object was sent to the listeners
    .map(s -> {//do post process stuff})
    .subscribe();