如何等待反应流完成,类似于Thread.join? (没有 blockLast 或主动等待)
How to wait for reactive stream completion, similar to Thread.join? (without blockLast or active waiting)
我有反应流,但在 CLI 应用程序中,那个反应流立即终止,我找不到任何可接受的解决方案。
让我们从简单的例子开始。 CLI 应用程序,我们创建“长”运行ning 守护进程线程来模拟我在反应流中遇到的类似情况:
public static void main(String[] args) {
Thread thread = new Thread(() -> {
try {
System.out.println("Start.");
Thread.sleep(10000);
System.out.println("Well done!");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
thread.setDaemon(true);
thread.start();
}
如果我们 运行 这样做,应用程序会立即终止,因为当主线程完成时没有非守护线程 运行ning。这个例子很容易修复:
public static void main(String[] args) {
Thread thread = new Thread(() -> {
try {
System.out.println("Start.");
Thread.sleep(10000);
System.out.println("Well done!");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
thread.setDaemon(true);
thread.start();
try {
thread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
现在它“做得很好”,至少打印出来的是这样。现在让我们转向反应流。以下立即终止:
public static void main(String[] args) {
Flux.range(1, 10)
.zipWith(Flux.interval(Duration.ofSeconds(1)))
.map(Tuple2::getT1)
.subscribe(System.out::println);
}
好的,我可以这样“解决”这个问题:
public static void main(String[] args) {
Flux.range(1, 10)
.zipWith(Flux.interval(Duration.ofSeconds(1)))
.map(Tuple2::getT1)
.doOnNext(System.out::println)
.blockLast();
}
}
但不想使用副作用方法doOnNext
或blockLast
,我觉得不太好,我现在没有可能使用errorConsumer
和 completeConsumer
在 subscribe
方法中可用。我看到了以下推荐:
public static void main(String[] args) {
Disposable disposable = Flux.range(1, 10)
.zipWith(Flux.interval(Duration.ofSeconds(1)))
.map(Tuple2::getT1)
.subscribe(System.out::println);
while (!disposable.isDisposed()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
虽然使用一次性的很好,但主动等待不是。
此处使用订阅并以某种方式加入“反应线程”的正确解决方案是什么?或者,如果 subscribe
不是可行的方法,我可以以某种方式(对所有 3 个消费者)以不同的方式重新表述它,那么如何?
您所描述的是您不想使用 reactor 的阻塞机制,而是实现一个自定义的阻塞机制,它几乎可以做同样的事情。
就用blockLast()
,完全适用于这种情况。
但是,如果您的应用程序比您的示例更复杂,只需确保您真的只在该主线程上阻塞一次!
我有反应流,但在 CLI 应用程序中,那个反应流立即终止,我找不到任何可接受的解决方案。
让我们从简单的例子开始。 CLI 应用程序,我们创建“长”运行ning 守护进程线程来模拟我在反应流中遇到的类似情况:
public static void main(String[] args) {
Thread thread = new Thread(() -> {
try {
System.out.println("Start.");
Thread.sleep(10000);
System.out.println("Well done!");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
thread.setDaemon(true);
thread.start();
}
如果我们 运行 这样做,应用程序会立即终止,因为当主线程完成时没有非守护线程 运行ning。这个例子很容易修复:
public static void main(String[] args) {
Thread thread = new Thread(() -> {
try {
System.out.println("Start.");
Thread.sleep(10000);
System.out.println("Well done!");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
thread.setDaemon(true);
thread.start();
try {
thread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
现在它“做得很好”,至少打印出来的是这样。现在让我们转向反应流。以下立即终止:
public static void main(String[] args) {
Flux.range(1, 10)
.zipWith(Flux.interval(Duration.ofSeconds(1)))
.map(Tuple2::getT1)
.subscribe(System.out::println);
}
好的,我可以这样“解决”这个问题:
public static void main(String[] args) {
Flux.range(1, 10)
.zipWith(Flux.interval(Duration.ofSeconds(1)))
.map(Tuple2::getT1)
.doOnNext(System.out::println)
.blockLast();
}
}
但不想使用副作用方法doOnNext
或blockLast
,我觉得不太好,我现在没有可能使用errorConsumer
和 completeConsumer
在 subscribe
方法中可用。我看到了以下推荐:
public static void main(String[] args) {
Disposable disposable = Flux.range(1, 10)
.zipWith(Flux.interval(Duration.ofSeconds(1)))
.map(Tuple2::getT1)
.subscribe(System.out::println);
while (!disposable.isDisposed()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
虽然使用一次性的很好,但主动等待不是。
此处使用订阅并以某种方式加入“反应线程”的正确解决方案是什么?或者,如果 subscribe
不是可行的方法,我可以以某种方式(对所有 3 个消费者)以不同的方式重新表述它,那么如何?
您所描述的是您不想使用 reactor 的阻塞机制,而是实现一个自定义的阻塞机制,它几乎可以做同样的事情。
就用blockLast()
,完全适用于这种情况。
但是,如果您的应用程序比您的示例更复杂,只需确保您真的只在该主线程上阻塞一次!