在另一个完成后优雅地停止无限通量
Gracefully stopping an infinite flux after another has completed
我有两个 Flux
个实例:
- 包含正在执行的任务状态的字符串通量。例如:
Flux.just("pending", "running", "running", "successful");
这种通量是有限的,最终会在任务成功执行时结束。
- 另一个包含任务日志的字符串。当然,日志也是有限的,但是由于与这个问题无关的原因(并且不受我的控制),通量是无限。它会产生有限数量的字符串,但不会产生
completed
信号:
Flux.<Integer> create(emitter -> { // I'm using integers for illustration purposes.
for (int i = 50; i < 60; i++) {
emitter.next(i);
}
// Note that there's no emitter.complete() here.
})
我想阻塞直到任务完成,同时在后台处理任务日志。任务完成后,我想取消logs flux,这样就不会一直占用资源了。如果它是有限的,我可以很容易地做到这一点:
public static void main(String[] args) {
Flux<String> state = Flux.just("pending", "running", "running", "success")
.delayElements(Duration.ofSeconds(5)) // Simulate the time it takes for a task to finish.
.doOnNext(System.out::println); // Here the status would be sent to another microservice.
Flux<List<Integer>> logs = Flux.<Integer> create(emitter -> { // The logs are usually strings, but I'm using integers for
// illustration purposes.
for (int i = 50; i < 60; i++) {
emitter.next(i);
}
// Note that there's no emitter.complete() here.
})
.delayElements(Duration.ofSeconds(4)) // Simulate the time between each log message.
.bufferTimeout(30, Duration.ofSeconds(15)) // The logs are again sent to another microservice, but we
// don't want to make a separate request for each log
// line, so we batch them.
.doOnNext(System.out::println);
Flux.mergeDelayError(1, state.then(), logs.then())
.blockLast();
}
不幸的是,它不是......所以,我解决这个问题的第一个想法是订阅日志流量,然后在状态流量完成后终止订阅:
public static void main(String[] args) {
Flux<String> state = Flux.just("pending", "running", "running", "success")
.delayElements(Duration.ofSeconds(5)) // Simulate the time it takes for a task to finish.
.doOnNext(System.out::println); // Here the status would be sent to another microservice.
Flux<List<Integer>> logs = Flux.<Integer> create(emitter -> { // The logs are usually strings, but I'm using integers for
// illustration purposes.
for (int i = 50; i < 60; i++) {
emitter.next(i);
}
// Note that there's no emitter.complete() here.
})
.delayElements(Duration.ofSeconds(4)) // Simulate the between each log message.
.bufferTimeout(30, Duration.ofSeconds(15)) // The logs are again sent to another microservice, but we
// don't want to make a separate request for each log
// line, so we batch them.
.doOnNext(System.out::println);
Disposable logsSubscription = logs.subscribe();
state.doFinally(signalType -> logsSubscription.dispose()).blockLast();
}
这可行,但它不能很好地处理日志缓冲 (.bufferTimeout(30, Duration.ofSeconds(15))
),有时会导致任务日志丢失。例如,两个日志进入缓冲区,但在达到缓冲区限制(或超过超时)之前,日志通量被取消。因此,这两个日志将永远不会得到处理。
我的第二个想法是以某种方式使用 takeUntil(Predicate<? super T> predicate)
来决定何时停止从日志通量中获取元素。然而,这是有问题的,因为只有在 flux 中有新元素时才会调用谓词。这意味着如果任务在最后一个日志发出后完成,则永远不会调用谓词。但是,如果我将无限流量的日志与另一个无限的虚拟对象流合并以确保定期触发 takeUntil
,我可以使用 takeUntil
:
private static boolean completed;
public static void main(String[] args) {
Flux<String> state = Flux.just("pending", "running", "running", "success")
.delayElements(Duration.ofSeconds(2)) // Simulate the time it takes for a task to finish.
.doOnNext(System.out::println) // Here the status would be sent to another microservice.
.doFinally(signalType -> {
completed = true;
});
Flux<Integer> logs = Flux.<Integer> create(emitter -> { // The logs are usually strings, but I'm using integers for
// illustration purposes.
for (int i = 50; i < 60; i++) {
emitter.next(i);
}
// Note that there's no emitter.complete() here.
})
.delayElements(Duration.ofSeconds(4)); // Simulate the time between each log message.
Flux<Integer> healthCheck = Flux.<Integer> generate(sink -> {
sink.next(1);
})
.delayElements(Duration.ofSeconds(3));
Flux<List<Integer>> logsWithHealthCheck = Flux.merge(logs, healthCheck)
.takeUntil(i -> completed)
.filter(i -> i != 1)
.bufferTimeout(30, Duration.ofSeconds(15))
.doOnNext(System.out::println)
.doFinally(System.out::println);
Flux<Object> mergeDelayError = Flux.mergeDelayError(1, state, logsWithHealthCheck);
mergeDelayError.then()
.block();
}
这工作正常,但似乎有点……老套。有没有更好的方法来完成我想要的?
我能够通过使用 takeUntilOther
并将状态通量变成热源来解决我的问题:
public static void main(String[] args) {
Flux<String> state = Flux.just("pending", "running", "running", "success")
.delayElements(Duration.ofSeconds(10)) // Simulate the time it takes for a task to finish.
.doOnNext(System.out::println) // Here the status would be sent to another microservice.
.replay() // Important
.autoConnect();
Flux<List<Integer>> logs = Flux.<Integer> create(emitter -> {
for (int i = 50; i < 60; i++) {
emitter.next(i);
}
// Note that there's no emitter.complete() here.
})
.delayElements(Duration.ofSeconds(5)) // Simulate the time between each log message.
.takeUntilOther(state.then())
.bufferTimeout(30, Duration.ofSeconds(15))
.doOnNext(System.out::println)
.doFinally(System.out::println);
Flux<Object> mergeDelayError = Flux.mergeDelayError(1, state, logs);
mergeDelayError.then()
.block();
}
热源部分很重要,否则 .takeUntilOther(state.then())
和 mergeDelayError.then().block()
行会在状态通量上创建两个订阅,这会重复它所做的工作。
我有两个 Flux
个实例:
- 包含正在执行的任务状态的字符串通量。例如:
Flux.just("pending", "running", "running", "successful");
这种通量是有限的,最终会在任务成功执行时结束。 - 另一个包含任务日志的字符串。当然,日志也是有限的,但是由于与这个问题无关的原因(并且不受我的控制),通量是无限。它会产生有限数量的字符串,但不会产生
completed
信号:
Flux.<Integer> create(emitter -> { // I'm using integers for illustration purposes.
for (int i = 50; i < 60; i++) {
emitter.next(i);
}
// Note that there's no emitter.complete() here.
})
我想阻塞直到任务完成,同时在后台处理任务日志。任务完成后,我想取消logs flux,这样就不会一直占用资源了。如果它是有限的,我可以很容易地做到这一点:
public static void main(String[] args) {
Flux<String> state = Flux.just("pending", "running", "running", "success")
.delayElements(Duration.ofSeconds(5)) // Simulate the time it takes for a task to finish.
.doOnNext(System.out::println); // Here the status would be sent to another microservice.
Flux<List<Integer>> logs = Flux.<Integer> create(emitter -> { // The logs are usually strings, but I'm using integers for
// illustration purposes.
for (int i = 50; i < 60; i++) {
emitter.next(i);
}
// Note that there's no emitter.complete() here.
})
.delayElements(Duration.ofSeconds(4)) // Simulate the time between each log message.
.bufferTimeout(30, Duration.ofSeconds(15)) // The logs are again sent to another microservice, but we
// don't want to make a separate request for each log
// line, so we batch them.
.doOnNext(System.out::println);
Flux.mergeDelayError(1, state.then(), logs.then())
.blockLast();
}
不幸的是,它不是......所以,我解决这个问题的第一个想法是订阅日志流量,然后在状态流量完成后终止订阅:
public static void main(String[] args) {
Flux<String> state = Flux.just("pending", "running", "running", "success")
.delayElements(Duration.ofSeconds(5)) // Simulate the time it takes for a task to finish.
.doOnNext(System.out::println); // Here the status would be sent to another microservice.
Flux<List<Integer>> logs = Flux.<Integer> create(emitter -> { // The logs are usually strings, but I'm using integers for
// illustration purposes.
for (int i = 50; i < 60; i++) {
emitter.next(i);
}
// Note that there's no emitter.complete() here.
})
.delayElements(Duration.ofSeconds(4)) // Simulate the between each log message.
.bufferTimeout(30, Duration.ofSeconds(15)) // The logs are again sent to another microservice, but we
// don't want to make a separate request for each log
// line, so we batch them.
.doOnNext(System.out::println);
Disposable logsSubscription = logs.subscribe();
state.doFinally(signalType -> logsSubscription.dispose()).blockLast();
}
这可行,但它不能很好地处理日志缓冲 (.bufferTimeout(30, Duration.ofSeconds(15))
),有时会导致任务日志丢失。例如,两个日志进入缓冲区,但在达到缓冲区限制(或超过超时)之前,日志通量被取消。因此,这两个日志将永远不会得到处理。
我的第二个想法是以某种方式使用 takeUntil(Predicate<? super T> predicate)
来决定何时停止从日志通量中获取元素。然而,这是有问题的,因为只有在 flux 中有新元素时才会调用谓词。这意味着如果任务在最后一个日志发出后完成,则永远不会调用谓词。但是,如果我将无限流量的日志与另一个无限的虚拟对象流合并以确保定期触发 takeUntil
,我可以使用 takeUntil
:
private static boolean completed;
public static void main(String[] args) {
Flux<String> state = Flux.just("pending", "running", "running", "success")
.delayElements(Duration.ofSeconds(2)) // Simulate the time it takes for a task to finish.
.doOnNext(System.out::println) // Here the status would be sent to another microservice.
.doFinally(signalType -> {
completed = true;
});
Flux<Integer> logs = Flux.<Integer> create(emitter -> { // The logs are usually strings, but I'm using integers for
// illustration purposes.
for (int i = 50; i < 60; i++) {
emitter.next(i);
}
// Note that there's no emitter.complete() here.
})
.delayElements(Duration.ofSeconds(4)); // Simulate the time between each log message.
Flux<Integer> healthCheck = Flux.<Integer> generate(sink -> {
sink.next(1);
})
.delayElements(Duration.ofSeconds(3));
Flux<List<Integer>> logsWithHealthCheck = Flux.merge(logs, healthCheck)
.takeUntil(i -> completed)
.filter(i -> i != 1)
.bufferTimeout(30, Duration.ofSeconds(15))
.doOnNext(System.out::println)
.doFinally(System.out::println);
Flux<Object> mergeDelayError = Flux.mergeDelayError(1, state, logsWithHealthCheck);
mergeDelayError.then()
.block();
}
这工作正常,但似乎有点……老套。有没有更好的方法来完成我想要的?
我能够通过使用 takeUntilOther
并将状态通量变成热源来解决我的问题:
public static void main(String[] args) {
Flux<String> state = Flux.just("pending", "running", "running", "success")
.delayElements(Duration.ofSeconds(10)) // Simulate the time it takes for a task to finish.
.doOnNext(System.out::println) // Here the status would be sent to another microservice.
.replay() // Important
.autoConnect();
Flux<List<Integer>> logs = Flux.<Integer> create(emitter -> {
for (int i = 50; i < 60; i++) {
emitter.next(i);
}
// Note that there's no emitter.complete() here.
})
.delayElements(Duration.ofSeconds(5)) // Simulate the time between each log message.
.takeUntilOther(state.then())
.bufferTimeout(30, Duration.ofSeconds(15))
.doOnNext(System.out::println)
.doFinally(System.out::println);
Flux<Object> mergeDelayError = Flux.mergeDelayError(1, state, logs);
mergeDelayError.then()
.block();
}
热源部分很重要,否则 .takeUntilOther(state.then())
和 mergeDelayError.then().block()
行会在状态通量上创建两个订阅,这会重复它所做的工作。