Akka Streams onFailuresWithBackoff 不重启流程
Akka Streams onFailuresWithBackoff not restarting flow
我正在尝试在 Akka Streams javadsl 中使用 RestartFlow
来重新启动我的流程阶段之一(如果在该阶段发生任何故障),但它似乎并没有重新启动流程,而是只是掉线了消息。
我已经看到了这个:,但我使用的是 2.5.19 版本所以它应该被修复?
RestartFlow.onFailuresWithBackoff
和 RestartFlow.withBackoff
我都试过了,但都没有用。我也尝试过使用整个 Actor 系统主管策略,但这似乎只是拦截异常,这样它就不会从流程中抛出,而且似乎也没有提供我想要的退避和最大重试策略。
流:
public Consumer.DrainingControl<Done> stream() {
return Consumer.committableSource(consumerSettings,
Subscriptions.topics(config.getString(ConfigKeys.KAFKA_CONFIG_PREFIX +
ConfigKeys.CONSUMER_TOPIC)))
.via(RestartFlow.onFailuresWithBackoff(
Duration.ofSeconds(1), // min backoff
Duration.ofSeconds(2), // max backoff,
0.2, // adds 20% "noise" to vary the intervals slightly
10, // limits the amount of restarts to 10
this::dispatchMessageFlow))
.via(Committer.flow(CommitterSettings.create(system)))
.toMat(Sink.ignore(), Keep.both())
.mapMaterializedValue(Consumer::createDrainingControl)
.run(mat);
}
然后是流程:
private Flow<ConsumerMessage.CommittableMessage<String, String>,
ConsumerMessage.Committable, NotUsed> dispatchMessageFlow() {
return Flow.<ConsumerMessage.CommittableMessage<String, String>>create()
.mapAsyncUnordered(
config.getInt(ConfigKeys.PARALLELISM),
msg ->
streamProcessor.process(msg.record().value())
.whenComplete((done, e) -> {
if (e != null) {
throw new RuntimeException(e);
} else {
if (done.status().isSuccess()){
streamingConsumerLogger.info("Successfully posted message, got response:\n{}",
done.toString());
} else {
throw new RuntimeException("HTTP Error!");
}
}
})
.thenApply(done -> msg.committableOffset()));
}
我看到过一次异常,akka 说它会因为失败而重新启动图表,但之后就没有其他了。根据我的理解,我应该再看 10 次。消费者继续收听新消息,因此消息似乎刚刚被丢弃。
java.util.concurrent.CompletionException: java.lang.RuntimeException: HTTP Error!
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:769)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:443)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.lang.RuntimeException: HTTP Error!
at com.company.app.messageforwarder.StreamingConsumerService.lambda$null[=12=](StreamingConsumerService.java:72)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
... 6 more
如果有人能帮我指出正确的方向,我将不胜感激。
它的工作方式有点不同。长话短说 - 如果发生错误,消息将被丢弃,但 source/flow 将重新启动,而不会终止整个流。它在 RestartFlow.onFailuresWithBackoff documentation:
中有描述
The restart process is inherently lossy, since there is no coordination between cancelling and the sending of messages. A termination signal from either end of the wrapped Flow will cause the other end to be terminated, and any in transit messages will be lost. During backoff, this Flow will backpressure.
我正在尝试在 Akka Streams javadsl 中使用 RestartFlow
来重新启动我的流程阶段之一(如果在该阶段发生任何故障),但它似乎并没有重新启动流程,而是只是掉线了消息。
我已经看到了这个:
RestartFlow.onFailuresWithBackoff
和 RestartFlow.withBackoff
我都试过了,但都没有用。我也尝试过使用整个 Actor 系统主管策略,但这似乎只是拦截异常,这样它就不会从流程中抛出,而且似乎也没有提供我想要的退避和最大重试策略。
流:
public Consumer.DrainingControl<Done> stream() {
return Consumer.committableSource(consumerSettings,
Subscriptions.topics(config.getString(ConfigKeys.KAFKA_CONFIG_PREFIX +
ConfigKeys.CONSUMER_TOPIC)))
.via(RestartFlow.onFailuresWithBackoff(
Duration.ofSeconds(1), // min backoff
Duration.ofSeconds(2), // max backoff,
0.2, // adds 20% "noise" to vary the intervals slightly
10, // limits the amount of restarts to 10
this::dispatchMessageFlow))
.via(Committer.flow(CommitterSettings.create(system)))
.toMat(Sink.ignore(), Keep.both())
.mapMaterializedValue(Consumer::createDrainingControl)
.run(mat);
}
然后是流程:
private Flow<ConsumerMessage.CommittableMessage<String, String>,
ConsumerMessage.Committable, NotUsed> dispatchMessageFlow() {
return Flow.<ConsumerMessage.CommittableMessage<String, String>>create()
.mapAsyncUnordered(
config.getInt(ConfigKeys.PARALLELISM),
msg ->
streamProcessor.process(msg.record().value())
.whenComplete((done, e) -> {
if (e != null) {
throw new RuntimeException(e);
} else {
if (done.status().isSuccess()){
streamingConsumerLogger.info("Successfully posted message, got response:\n{}",
done.toString());
} else {
throw new RuntimeException("HTTP Error!");
}
}
})
.thenApply(done -> msg.committableOffset()));
}
我看到过一次异常,akka 说它会因为失败而重新启动图表,但之后就没有其他了。根据我的理解,我应该再看 10 次。消费者继续收听新消息,因此消息似乎刚刚被丢弃。
java.util.concurrent.CompletionException: java.lang.RuntimeException: HTTP Error!
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:769)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:443)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.lang.RuntimeException: HTTP Error!
at com.company.app.messageforwarder.StreamingConsumerService.lambda$null[=12=](StreamingConsumerService.java:72)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
... 6 more
如果有人能帮我指出正确的方向,我将不胜感激。
它的工作方式有点不同。长话短说 - 如果发生错误,消息将被丢弃,但 source/flow 将重新启动,而不会终止整个流。它在 RestartFlow.onFailuresWithBackoff documentation:
中有描述The restart process is inherently lossy, since there is no coordination between cancelling and the sending of messages. A termination signal from either end of the wrapped Flow will cause the other end to be terminated, and any in transit messages will be lost. During backoff, this Flow will backpressure.