如何等待反应流完成,类似于Thread.join? (没有 blockLast 或主动等待)

How to wait for reactive stream completion, similar to Thread.join? (without blockLast or active waiting)

我有反应流,但在 CLI 应用程序中,那个反应流立即终止,我找不到任何可接受的解决方案。

让我们从简单的例子开始。 CLI 应用程序,我们创建“长”运行ning 守护进程线程来模拟我在反应流中遇到的类似情况:

public static void main(String[] args) {
    Thread thread = new Thread(() -> {
        try {
            System.out.println("Start.");
            Thread.sleep(10000);
            System.out.println("Well done!");
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    });
    thread.setDaemon(true);
    thread.start();
}

如果我们 运行 这样做,应用程序会立即终止,因为当主线程完成时没有非守护线程 运行ning。这个例子很容易修复:

public static void main(String[] args) {
    Thread thread = new Thread(() -> {
        try {
            System.out.println("Start.");
            Thread.sleep(10000);
            System.out.println("Well done!");
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    });
    thread.setDaemon(true);
    thread.start();
    try {
        thread.join();
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}

现在它“做得很好”,至少打印出来的是这样。现在让我们转向反应流。以下立即终止:

public static void main(String[] args) {
    Flux.range(1, 10)
            .zipWith(Flux.interval(Duration.ofSeconds(1)))
            .map(Tuple2::getT1)
            .subscribe(System.out::println);
}

好的,我可以这样“解决”这个问题:

public static void main(String[] args) {
    Flux.range(1, 10)
            .zipWith(Flux.interval(Duration.ofSeconds(1)))
            .map(Tuple2::getT1)
            .doOnNext(System.out::println)
            .blockLast();
}

}

但不想使用副作用方法doOnNextblockLast,我觉得不太好,我现在没有可能使用errorConsumercompleteConsumersubscribe 方法中可用。我看到了以下推荐:

public static void main(String[] args) {
    Disposable disposable = Flux.range(1, 10)
            .zipWith(Flux.interval(Duration.ofSeconds(1)))
            .map(Tuple2::getT1)
            .subscribe(System.out::println);

    while (!disposable.isDisposed()) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

虽然使用一次性的很好,但主动等待不是。

此处使用订阅并以某种方式加入“反应线程”的正确解决方案是什么?或者,如果 subscribe 不是可行的方法,我可以以某种方式(对所有 3 个消费者)以不同的方式重新表述它,那么如何?

您所描述的是您不想使用 reactor 的阻塞机制,而是实现一个自定义的阻塞机制,它几乎可以做同样的事情。

就用blockLast(),完全适用于这种情况。

但是,如果您的应用程序比您的示例更复杂,只需确保您真的只在该主线程上阻塞一次!