Java CompletableFuture:只有第一个结果

Java CompletableFuture: only first result

阅读 Oracle 站点上的这篇文章 https://community.oracle.com/docs/DOC-995305 后,我正在尝试实施 "Some Two-to-One Selecting Patterns" 段落中描述的模式。 最后一类模式也包含二对一模式。但这一次,不是执行一次下游元素,而是两个上游元素完成,当两个上游元素之一完成时执行下游元素。例如,当我们想要解析域名时,这可能非常有用。我们可能会发现查询一组域名服务器比只查询一个域名服务器更有效。我们不希望从不同的服务器得到不同的结果,所以我们不需要比第一个得到的更多的答案。可以安全地取消所有其他查询。

实现一个我只有 2 个 CompleatableFuture 的场景很简单,但我无法用 3 个或更多 CompleatableFuture 实现相同的场景。

我试过这个:

    CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF1"));
    CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF2"));
    CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF3"));
    CompletableFuture<String> cf4 = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF4"));

    cf1.applyToEither(
            cf2, s1 -> cf2.applyToEither(
                    cf3, s2 -> cf3.applyToEither(
                            cf4, s3 -> "First result is: " + s3))).thenAccept(System.out::println).join();

FutureMain 是我的class,这是 generateString 方法

public static String generateString(String input) {
    Random r = new Random();
    int millis = r.nextInt(6) * 1000;
    System.out.println(input + " " + millis);
    try {
        Thread.sleep(millis);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return input + ": " + millis;
}

当我希望所有 CompleatableFuture 都完成时,我成功地组合了多个 CompleatableFuture:

    CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF1"));
    CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF2"));
    CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF3"));
    CompletableFuture<String> cf4 = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF4"));

    CompletableFuture<String> cf5 = CompletableFuture.allOf(cf1, cf2, cf3, cf4).thenApply(
            s -> elaborate(cf1.join(), cf2.join(), cf3.join(), cf4.join())); 

    cf5.thenAccept(System.out::println).join();

有什么建议吗?

二对一选择模式说:

the downstream element is executed when one of the two upstream elements is completed.

例如,select一个用户来自两个服务器,一个服务器return一个用户,另一个服务器将被阻止或由于某种原因return一个用户稍后,无论哪个服务器已经return编辑了一个用户,下游将被执行。

使用java8流api实现二对一选择模式

//the first upstream is always blocked.
CompletableFuture<String> blocked = new CompletableFuture<>();
CompletableFuture<String> upstreams = Stream.of(cf1, cf2, cf3, cf4).reduce(blocked,
        (it, upstream) -> it.applyToEither(upstream, Function.identity()));

upstreams.thenAccept(System.out::println).join();// print "foo"

你的代码应该是这样的:

我从 CompletableFeature 导入静态方法 supplyAsync 来解决打印问题。

CompletableFuture<String> cf1 = supplyAsync(blocked(String.class));
CompletableFuture<String> cf2 = supplyAsync(returnValueLater("bar"));
CompletableFuture<String> cf3 = supplyAsync(blocked(String.class));
CompletableFuture<String> cf4 = supplyAsync(returnValue("foo"));

CompletableFuture<String> upstreams = cf1.applyToEither(cf2, Function.identity())
                                         .applyToEither(cf3, Function.identity())
                                         .applyToEither(cf4, Function.identity());

upstreams.thenAccept(System.out::println).join();// print "foo"

private <T> Supplier<T> returnValue(T value) {
    return returnValue(() -> value);
}

private <T> Supplier<T> blocked(Class<T> type) {
    return returnValue(() -> {
        Thread.currentThread().join();
        return null;
    });
}

private <T> Supplier<T> returnValueLater(T value) {
    return returnValue(() -> {
        Thread.sleep(100);
        return value;
    });
}

private <T> Supplier<T> returnValue(Callable<T> value) {
    return () -> {
        try {
            return value.call();
        } catch (Exception e) { throw new RuntimeException(e); }
    };
}

所有 CompletableFuture 模式

import org.junit.jupiter.api.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.*;
import java.util.stream.*;
import static java.util.Arrays.asList;
import static java.util.concurrent.CompletableFuture.*;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
public class CompletableFuturePatternTest {

    @Test @DisplayName("Two-to-One Selecting Pattern")
    void selectingManyToOne() throws Throwable {
        String user = select("select user from User", String.class)
                .from(availableServers())
                .getFirstResult();

        assertThat(user, equalTo("Joe"));
    }

    @Test @DisplayName("Two-to-One Combining Pattern")
    void combiningManyToOne() throws Throwable {
        List<String> users = select("select user from User", String.class)
                .from(availableServers())
                .list();

        assertThat(users, equalTo(asList("Bob", "Joe", "Doe")));
    }

    @Test @DisplayName("One-to-One Pattern")
    void waitUntilUpstreamCompleted() throws Throwable {
        String user = select("select user from User", String.class)
                .from(availableServers())
                .to(String::toUpperCase);

        assertThat(user, equalTo("JOE"));
    }

    private CompletableFuture<String>[] availableServers() {
        return new CompletableFuture[]{
                server(returnValueLater("Bob")),
                server(returnValue("Joe")),
                server(returnValueLater("Doe")),
        };
    }

    private <T> CompletableFuture<T> server(Supplier<T> supplier) {
        return supplyAsync(supplier);
    }

    private <T> Supplier<T> returnValue(T value) {
        return returnValue(() -> value);
    }


    private <T> Supplier<T> returnValueLater(T value) {
        return returnValue(() -> {
            Thread.sleep(500);
            return value;
        });
    }

    private <T> Supplier<T> returnValue(Callable<T> value) {
        return () -> {
            try {
                return value.call();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        };
    }

    private <T> Query<T> select(String query, Class<T> type) {
        return new Query<T>() {
            private CompletableFuture<T>[] upstreams;

            @Override
            public Query<T> from(CompletableFuture<T>... upstreams) {
                this.upstreams = upstreams;
                return this;
            }

            @Override
            public T getFirstResult() throws Exception {
                return selecting().get();
            }

            @Override
            public <R> R to(Function<T, R> mapper) throws Exception {
                return selecting().thenApply(mapper).get();
            }

            private CompletableFuture<T> selecting() {
                return upstreams(blocked(), this::selecting);
            }

            private CompletableFuture<T> selecting(CompletableFuture<T> primary,
                                                   CompletableFuture<T> upstream) {
                return primary.applyToEitherAsync(upstream, Function.identity());
            }

            private CompletableFuture<T> blocked() {
                return new CompletableFuture<>();
            }

            @Override
            public List<T> list() throws Exception {
                return upstreams(collector(), this::combine, this::combiner).get();
            }

            private CompletableFuture<List<T>> collector() {
                return completedFuture(new ArrayList<>());
            }

            private CompletableFuture<List<T>> combine(CompletableFuture<List<T>> primary,
                                                       CompletableFuture<T> upstream) {
                return primary.thenCombineAsync(upstream, this::concat);
            }

            private List<T> concat(List<T> result, T value) {
                result.add(value);
                return result;
            }

            private CompletableFuture<List<T>> combiner(CompletableFuture<List<T>> primary
                    , CompletableFuture<List<T>> secondary) {

                return primary.thenCombineAsync(secondary, this::concat);
            }

            private <T> List<T> concat(List<T> primary, List<T> secondary) {
                primary.addAll(secondary);
                return primary;
            }

            private CompletableFuture<T> upstreams(CompletableFuture<T> identity,
                                                   BinaryOperator<CompletableFuture<T>> accumulator) {
                return upstreams(identity, accumulator, accumulator);
            }

            private <U> CompletableFuture<U> upstreams(CompletableFuture<U> identity
                    , BiFunction<CompletableFuture<U>, CompletableFuture<T>, CompletableFuture<U>> accumulator
                    , BinaryOperator<CompletableFuture<U>> combiner) {
                return Stream.of(upstreams).reduce(identity, accumulator, combiner);
            }

        };
    }

    interface Query<T> {
        Query<T> from(CompletableFuture<T>... upstreams);

        T getFirstResult() throws Exception;

        <R> R to(Function<T, R> mapper) throws Exception;

        List<T> list() throws Exception;
    }
}

“二对一”模式不能很好地扩展到任意数字。这就是存在 allOfanyOf 等便利方法的原因。既然你注意到了前者,就不清楚为什么你忽略了后者:

CompletableFuture<String> cf1
    = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF1"));
CompletableFuture<String> cf2
    = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF2"));
CompletableFuture<String> cf3
    = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF3"));
CompletableFuture<String> cf4
    = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF4"));

CompletableFuture<String> cf5 = CompletableFuture.anyOf(cf1, cf2, cf3, cf4)
    .thenApply(String.class::cast); 

cf5.thenAccept(System.out::println).join();

这是完成第一个完成后的工作。

这种方法的缺点是它总是喜欢第一个完成的,不管它是否异常完成。 this answer:

中显示了一种替代方案,它在第一个非异常完成时完成,并且仅在所有期货异常完成时才异常完成
public static <T>
    CompletableFuture<T> anyOf(List<? extends CompletionStage<? extends T>> l) {

    CompletableFuture<T> f=new CompletableFuture<>();
    Consumer<T> complete=f::complete;
    CompletableFuture.allOf(
        l.stream().map(s -> s.thenAccept(complete)).toArray(CompletableFuture<?>[]::new)
    ).exceptionally(ex -> { f.completeExceptionally(ex); return null; });
    return f;
}

它也使类型转换过时:

CompletableFuture<String> cf1
    = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF1"));
CompletableFuture<String> cf2
    = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF2"));
CompletableFuture<String> cf3
    = CompletableFuture.supplyAsync(() -> FutureMain.generateString("CF3"));
CompletableFuture<String> cf4 // to demonstrate that this quick failure is not prefered
    = CompletableFuture.supplyAsync(() -> { throw new RuntimeException(); });

CompletableFuture<String> cf5 = anyOf(Arrays.asList(cf1, cf2, cf3, cf4));

cf5.thenAccept(System.out::println).join();