Chain CompletableFuture 并在第一次成功时停止

Chain CompletableFuture and stop on first success

我正在使用 API returns CompletableFuture 来查询设备(类似于 digitalpetri modbus)。

我需要用几个选项来调用这个 API 来查询设备并弄清楚它是什么 - 这基本上是反复试验,直到它成功。这些是我无法更改的嵌入式设备协议,但您可以认为该过程类似于以下工作方式:

  1. 你是苹果吗?
  2. 如果不是,那你是菠萝吗?
  3. 如果不是,那你是钢笔吗?
  4. ...

虽然 API 使用 futures,但实际上,通信是串行的(通过同一条物理线路),因此它们永远不会同步执行。一旦我知道它是什么,我希望能够停止尝试并让调用者知道它是什么。

我已经知道我只能通过 any 获得其中一个期货的结果(见下文),但这可能会导致应该避免的额外尝试。

是否有一种链式期货模式,一旦其中一个成功就停止?

类似,但浪费了非常有限的资源。

List<CompletableFuture<String>> futures = Arrays.asList(
    CompletableFuture.supplyAsync(() -> "attempt 1"),
    CompletableFuture.supplyAsync(() -> "attempt 2"),
    CompletableFuture.supplyAsync(() -> "attempt 3"));

CompletableFuture<String>[] futuresArray = (CompletableFuture<String>[]) futures.toArray();
CompletableFuture<Object> c = CompletableFuture.anyOf(futuresArray);

我认为你能做的最好的事情是,在你检索结果后,

futures.forEach(f -> f.cancel(true));

这不影响产生结果的人,尽量阻止其他人。由于 IIUC 你是从外部来源获得它们的,所以不能保证它实际上会打断他们的工作。

然而,由于

this class has no direct control over the computation that causes it to be completed, cancellation is treated as just another form of exceptional completion

(来自 CompletableFuture 文档),我怀疑它会做你真正想要的。

假设您有一个如您描述的 "pseudo-asynchronous" 方法,即它具有异步 API 但需要一些锁定才能执行:

private final static Object lock = new Object();

private static CompletableFuture<Boolean> pseudoAsyncCall(int input) {
    return CompletableFuture.supplyAsync(() -> {
                synchronized (lock) {
                    System.out.println("Executing for " + input);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                    return input > 3;
                }
            });
}

还有 List<Integer> 个要根据此方法检查的输入,您可以使用递归组合按顺序检查每个输入:

public static CompletableFuture<Integer> findMatch(List<Integer> inputs) {
    return findMatch(inputs, 0);
}

private static CompletableFuture<Integer> findMatch(List<Integer> inputs, int startIndex) {
    if (startIndex >= inputs.size()) {
        // no match found -- an exception could be thrown here if preferred
        return CompletableFuture.completedFuture(null);
    }
    return pseudoAsyncCall(inputs.get(startIndex))
            .thenCompose(result -> {
                if (result) {
                    return CompletableFuture.completedFuture(inputs.get(startIndex));
                } else {
                    return findMatch(inputs, startIndex + 1);
                }
            });
}

可以这样使用:

public static void main(String[] args) {
    List<Integer> inputs = Arrays.asList(0, 1, 2, 3, 4, 5);
    CompletableFuture<Integer> matching = findMatch(inputs);

    System.out.println("Found match: " + matching.join());
}

输出:

Executing for 0
Executing for 1
Executing for 2
Executing for 3
Executing for 4
Found match: 4

如您所见,它不会被调用用于输入 5,而您的 API (findMatch()) 仍然是异步的。