由于许多订阅,Flux 发送重复项
Flux sending duplicates due to many subscriptions
我正在尝试使用 Flux
作为某种队列系统。我希望能够将数据推送到 Flux
并稍后订阅。大多数时候我会推送 1 个元素并在之后立即订阅,但在某些情况下我需要等待 2/3 元素并在订阅之前对它们进行一些处理。
我已经设法让它与 UnicastProcessor
和 Flux
一起工作,如下所示:
private final FluxProcessor<Message> processor = UnicastProcessor.create();
private final Flux<Message> messages = processor.publish().autoconnect();
public void add(Message message){
processor.onNext(message);
}
public void flush(){
messages.flatMap(this::doSomeWork)
.doOnNext(System.out::println)
.subscribe();
}
public void flush(int buffer){
messages.take(buffer)
.flatMap(this::doSomeWork)
.subscribe();
}
此代码有效,但它会堆积订阅,因此它会在我每次订阅时打印相同的消息 x 次。我相信这是订阅 Flux
的假定行为,但这不是我想要实现的,而且我似乎无法找到如何做到这一点。
我试图实现的一个例子如下:
(每个数字代表一条新消息被推送到处理器)
--------1-----(*subscribe here*)----2---3---(*subscribe here*)-----4----(*subscribe here*)
输出如下:
1
2
3
4
而不是我得到的:
1
2
3
2
3
4
4
4
关于如何实现此目标的任何提示?
您之所以会得到这样的结果,是因为您没有退订之前的订阅,因此它们仍然有效并仍在处理您的消息。
要退订它们,您可以更改 flush()
方法:
public Disposable flush(){
return messages
.doOnNext(System.out::println)
.subscribe();
}
现在,您可以调用 dispose() 方法,该方法将在不再需要时取消您的订阅。
public void run() {
add(1);
flush().dispose();
System.out.println();
add(2);
add(3);
flush().dispose();
System.out.println();
add(4);
add(5);
add(6);
flush().dispose();
System.out.println();
add(7);
flush().dispose();
}
结果为:
1
2
3
4
5
6
7
我正在尝试使用 Flux
作为某种队列系统。我希望能够将数据推送到 Flux
并稍后订阅。大多数时候我会推送 1 个元素并在之后立即订阅,但在某些情况下我需要等待 2/3 元素并在订阅之前对它们进行一些处理。
我已经设法让它与 UnicastProcessor
和 Flux
一起工作,如下所示:
private final FluxProcessor<Message> processor = UnicastProcessor.create();
private final Flux<Message> messages = processor.publish().autoconnect();
public void add(Message message){
processor.onNext(message);
}
public void flush(){
messages.flatMap(this::doSomeWork)
.doOnNext(System.out::println)
.subscribe();
}
public void flush(int buffer){
messages.take(buffer)
.flatMap(this::doSomeWork)
.subscribe();
}
此代码有效,但它会堆积订阅,因此它会在我每次订阅时打印相同的消息 x 次。我相信这是订阅 Flux
的假定行为,但这不是我想要实现的,而且我似乎无法找到如何做到这一点。
我试图实现的一个例子如下: (每个数字代表一条新消息被推送到处理器)
--------1-----(*subscribe here*)----2---3---(*subscribe here*)-----4----(*subscribe here*)
输出如下:
1
2
3
4
而不是我得到的:
1
2
3
2
3
4
4
4
关于如何实现此目标的任何提示?
您之所以会得到这样的结果,是因为您没有退订之前的订阅,因此它们仍然有效并仍在处理您的消息。
要退订它们,您可以更改 flush()
方法:
public Disposable flush(){
return messages
.doOnNext(System.out::println)
.subscribe();
}
现在,您可以调用 dispose() 方法,该方法将在不再需要时取消您的订阅。
public void run() {
add(1);
flush().dispose();
System.out.println();
add(2);
add(3);
flush().dispose();
System.out.println();
add(4);
add(5);
add(6);
flush().dispose();
System.out.println();
add(7);
flush().dispose();
}
结果为:
1
2
3
4
5
6
7