Stop/Complete 基于某些条件的循环 CompletableFuture

Stop/Complete a looped CompletableFuture based on some condition

CompletableFuture<String> future = CompletableFuture.completedFuture(null);
for(int _retrial = 1 ; _retrial <= getRetrialCountBasedOnSomeLogic() ; _retrial++) {
  int finalRetrial = _retrial;
  future = future
      .thenCompose(lastRetrialStatus -> {
        if(Strings.isNullOrEmpty(lastRetrialStatus) || (!"SUCCESS".equals(lastRetrialStatus))) {
          return doSomeCalculations(finalRetrial);
        } else {
          return CompletableFuture.completedFuture(null);
        }
      })
      .handle((response, throwable) -> {
        if(throwable != null) {
          throwable = CompletableFutures.unwrapCompletionStateException(throwable);
          return throwable.getMessage();
        }
        return "SUCCESS";
      });
}
return future;

我最近开始研究 completablefutures,在上面的代码中 - 如您所见,我正在用 for 循环链接我的 futures。如果 handle returns "SUCCESS",我希望链停止。像这样:

CompletableFuture<String> future = CompletableFuture.completedFuture(null);
for(int _retrial = 1 ; _retrial <= getRetrialCountBasedOnSomeLogic() ; _retrial++) {
  int finalRetrial = _retrial;
  future = future
      .thenCompose(lastRetrialStatus -> doSomeCalculations(finalRetrial))
      .handle((response, throwable) -> {
        if(throwable != null) {
          throwable = CompletableFutures.unwrapCompletionStateException(throwable);
          return throwable.getMessage();
        }
        **Its a success, I dont need to look for any further passes of this loop. Lets end here by returning "SUCCESS"**
      });
}
return future;

只需将重试操作链接为仅在已知成功状态时才执行的处理程序的一部分。由于通过 lambda 表达式定义的函数不能引用自身,因此您需要一个可以调用的方法,例如

static CompletableFuture<String> yourMethod() {
    return yourMethod(1, getRetrialCountBasedOnSomeLogic());
}

private static CompletableFuture<String> yourMethod(int currentTry, int maxTries) {
    CompletableFuture<String> future = doSomeCalculations(currentTry);
    int nextTry = currentTry + 1;
    return future
        .handle((s, t) -> t == null? CompletableFuture.completedFuture("SUCCESS"):
            nextTry <= maxTries? yourMethod(nextTry, maxTries):
            CompletableFuture.completedFuture(
                CompletableFutures.unwrapCompletionStateException(t).getMessage()))
        .thenCompose(Function.identity());
}

不幸的是,handle没有映射到另一个未来的变体,所以我们需要映射到未来的未来,然后是.thenCompose(Function.identity())

您可以通过有条件地链接不同阶段来略微减少最后一次尝试的工作,这不需要组合:

private static CompletableFuture<String> yourMethod(int currentTry, int maxTries) {
    CompletableFuture<String> future
        = doSomeCalculations(currentTry).thenApply(x -> "SUCCESS");
    int nextTry = currentTry + 1;
    return nextTry <= maxTries?
        future.thenApply(CompletableFuture::completedFuture)
            .exceptionally(t -> yourMethod(nextTry, maxTries))
            .thenCompose(Function.identity()):
        future.exceptionally(t ->
            CompletableFutures.unwrapCompletionStateException(t).getMessage());
}

如果 maxTries 不是常量但需要重新计算 getRetrialCountBasedOnSomeLogic(),此变体也适用,就像您的循环所做的那样:

static CompletableFuture<String> yourMethod() {
    return yourMethod(1);
}

static CompletableFuture<String> yourMethod(int currentTry) {
    int maxTries = getRetrialCountBasedOnSomeLogic();
    CompletableFuture<String> future
        = doSomeCalculations(currentTry).thenApply(x -> "SUCCESS");
    int nextTry = currentTry + 1;
    return nextTry <= maxTries?
        future.thenApply(CompletableFuture::completedFuture)
            .exceptionally(t -> yourMethod(nextTry))
            .thenCompose(Function.identity()):
        future.exceptionally(t ->
            CompletableFutures.unwrapCompletionStateException(t).getMessage());
}