在应用 运行 期间将 events/values 动态推送到 Flux
Dynamically push events/values to a Flux during application run
我正在尝试使用 Java 和 project-reactor 创建反应管道,其中用例是应用程序在不同级别生成流状态(INIT、PROCESSING、SAVED、DONE)。状态必须异步发送到需要独立于主流处理的通量。我遇到了这个 link:
我的示例流程是这样的:
public class StatusEmitterImpl implements StatusEmitter {
private final FluxProcessor<String, String> processor;
private final FluxSink<String> sink;
public StatusEmitterImpl() {
this.processor = DirectProcessor.<String>create().serialize();
this.sink = processor.sink();
}
@Override
public Flux<String> publisher() {
return this.processor.map(x -> x);
}
public void publishStatus(String status) {
sink.next(status);
}
}
public class Try {
public static void main(String[] args) {
StatusEmitterImpl statusEmitter = new StatusEmitterImpl();
Flux.fromIterable(Arrays.asList("INIT", "DONE")).subscribe(x ->
statusEmitter.publishStatus(x));
statusEmitter.publisher().subscribe(x -> System.out.println(x));
}
}
问题是控制台上没有打印任何内容。我不明白我错过了什么。
DirectProcessor
直接将值传递给其注册的 Subscribers
,而不缓存信号。如果没有Subscriber
,则值为"forgotten"。如果 Subscriber
迟到,那么它只会收到在它订阅 之后 发出的信号。
这就是这里发生的事情:因为 fromIterable
在内存中的集合上工作,所以它有时间将所有值推送到 DirectProcessor
,到那时它还没有注册Subscriber
还没有。
如果你反转最后两行,你应该会看到一些东西。
DirectProcessor 是热发布器,不缓冲元素,因此您应该在其 subscribe.like 是
之后生成元素
public static void main(String[] args) {
StatusEmitterImpl statusEmitter = new StatusEmitterImpl();
statusEmitter.publisherA().subscribe(x -> System.out.println(x));
Flux.fromIterable(Arrays.asList("INIT", "DONE")).subscribe(x -> statusEmitter.publishStatus(x));
}
,或使用 EmitterProcessor、UnicastProcessor 代替 DirectProcessor。
我正在尝试使用 Java 和 project-reactor 创建反应管道,其中用例是应用程序在不同级别生成流状态(INIT、PROCESSING、SAVED、DONE)。状态必须异步发送到需要独立于主流处理的通量。我遇到了这个 link:
我的示例流程是这样的:
public class StatusEmitterImpl implements StatusEmitter {
private final FluxProcessor<String, String> processor;
private final FluxSink<String> sink;
public StatusEmitterImpl() {
this.processor = DirectProcessor.<String>create().serialize();
this.sink = processor.sink();
}
@Override
public Flux<String> publisher() {
return this.processor.map(x -> x);
}
public void publishStatus(String status) {
sink.next(status);
}
}
public class Try {
public static void main(String[] args) {
StatusEmitterImpl statusEmitter = new StatusEmitterImpl();
Flux.fromIterable(Arrays.asList("INIT", "DONE")).subscribe(x ->
statusEmitter.publishStatus(x));
statusEmitter.publisher().subscribe(x -> System.out.println(x));
}
}
问题是控制台上没有打印任何内容。我不明白我错过了什么。
DirectProcessor
直接将值传递给其注册的 Subscribers
,而不缓存信号。如果没有Subscriber
,则值为"forgotten"。如果 Subscriber
迟到,那么它只会收到在它订阅 之后 发出的信号。
这就是这里发生的事情:因为 fromIterable
在内存中的集合上工作,所以它有时间将所有值推送到 DirectProcessor
,到那时它还没有注册Subscriber
还没有。
如果你反转最后两行,你应该会看到一些东西。
DirectProcessor 是热发布器,不缓冲元素,因此您应该在其 subscribe.like 是
之后生成元素public static void main(String[] args) {
StatusEmitterImpl statusEmitter = new StatusEmitterImpl();
statusEmitter.publisherA().subscribe(x -> System.out.println(x));
Flux.fromIterable(Arrays.asList("INIT", "DONE")).subscribe(x -> statusEmitter.publishStatus(x));
}
,或使用 EmitterProcessor、UnicastProcessor 代替 DirectProcessor。