在应用 运行 期间将 events/values 动态推送到 Flux

Dynamically push events/values to a Flux during application run

我正在尝试使用 Java 和 project-reactor 创建反应管道,其中用例是应用程序在不同级别生成流状态(INIT、PROCESSING、SAVED、DONE)。状态必须异步发送到需要独立于主流处理的通量。我遇到了这个 link:

我的示例流程是这样的:

public class StatusEmitterImpl implements StatusEmitter {

    private final FluxProcessor<String, String> processor;
    private final FluxSink<String> sink;

    public StatusEmitterImpl() {
        this.processor = DirectProcessor.<String>create().serialize();
        this.sink = processor.sink();
    }

    @Override
    public Flux<String> publisher() {
        return this.processor.map(x -> x);
    }

    public void publishStatus(String status) {
        sink.next(status);
    }
}

public class Try {

    public static void main(String[] args) {

    StatusEmitterImpl statusEmitter = new StatusEmitterImpl();
    Flux.fromIterable(Arrays.asList("INIT", "DONE")).subscribe(x -> 
        statusEmitter.publishStatus(x));
    statusEmitter.publisher().subscribe(x -> System.out.println(x));
    }
}

问题是控制台上没有打印任何内容。我不明白我错过了什么。

DirectProcessor 直接将值传递给其注册的 Subscribers,而不缓存信号。如果没有Subscriber,则值为"forgotten"。如果 Subscriber 迟到,那么它只会收到在它订阅 之后 发出的信号。

这就是这里发生的事情:因为 fromIterable 在内存中的集合上工作,所以它有时间将所有值推送到 DirectProcessor,到那时它还没有注册Subscriber还没有。

如果你反转最后两行,你应该会看到一些东西。

DirectProcessor 是热发布器,不缓冲元素,因此您应该在其 subscribe.like 是

之后生成元素
public static void main(String[] args) {

        StatusEmitterImpl statusEmitter = new StatusEmitterImpl();
        statusEmitter.publisherA().subscribe(x -> System.out.println(x));
        Flux.fromIterable(Arrays.asList("INIT", "DONE")).subscribe(x -> statusEmitter.publishStatus(x));
    }

,或使用 EmitterProcessor、UnicastProcessor 代替 DirectProcessor。