由于许多订阅,Flux 发送重复项

Flux sending duplicates due to many subscriptions

我正在尝试使用 Flux 作为某种队列系统。我希望能够将数据推送到 Flux 并稍后订阅。大多数时候我会推送 1 个元素并在之后立即订阅,但在某些情况下我需要等待 2/3 元素并在订阅之前对它们进行一些处理。 我已经设法让它与 UnicastProcessorFlux 一起工作,如下所示:

private final FluxProcessor<Message> processor = UnicastProcessor.create();
private final Flux<Message> messages = processor.publish().autoconnect();

public void add(Message message){
    processor.onNext(message);
}

public void flush(){
   messages.flatMap(this::doSomeWork)
           .doOnNext(System.out::println)
           .subscribe();
}

public void flush(int buffer){
    messages.take(buffer)
            .flatMap(this::doSomeWork)
            .subscribe();
}

此代码有效,但它会堆积订阅,因此它会在我每次订阅时打印相同的消息 x 次。我相信这是订阅 Flux 的假定行为,但这不是我想要实现的,而且我似乎无法找到如何做到这一点。

我试图实现的一个例子如下: (每个数字代表一条新消息被推送到处理器)

--------1-----(*subscribe here*)----2---3---(*subscribe here*)-----4----(*subscribe here*)

输出如下:

1

2
3

4

而不是我得到的:

1

2
3
2
3

4
4
4

关于如何实现此目标的任何提示?

您之所以会得到这样的结果,是因为您没有退订之前的订阅,因此它们仍然有效并仍在处理您的消息。

要退订它们,您可以更改 flush() 方法:

public Disposable flush(){
    return messages
            .doOnNext(System.out::println)
            .subscribe();
}

现在,您可以调用 dispose() 方法,该方法将在不再需要时取消您的订阅。

public void run() {
    add(1);
    flush().dispose();
    System.out.println();
    add(2);
    add(3);
    flush().dispose();
    System.out.println();
    add(4);
    add(5);
    add(6);
    flush().dispose();
    System.out.println();
    add(7);
    flush().dispose();
}

结果为:

1

2
3

4
5
6

7