如何执行 CompletableFuture 数组并组合它们的结果
How to execute an Array of CompletableFuture and compose their results
我正在调查 Java 8 CompletableFutures
并阅读(并看到)我应该使用 thenCompose
而不是 thenApply
。
我已将我的代码转换为使用 thenCompose
但我感觉不正确。
这是我的控制代码...
final CompletableFuture<List<String>> extractor = get(htmlPageSource);
@SuppressWarnings("unchecked")
final CompletableFuture<List<Documentable>>[] completableFutures =
new CompletableFuture[ENDPOINT.EXTRACTABLES.size()];
int index = 0;
for( ENDPOINT endpoint : ENDPOINT.EXTRACTABLES ) {
final CompletableFuture<List<Documentable>> metaData =
extractor.thenComposeAsync(
s -> endpoint.contactEndpoit(s), executorService );
completableFutures[index++] = metaData.exceptionally(x -> failedList(x));
}
CompletableFuture
.allOf( completableFutures )
.thenComposeAsync( dummy -> combineDocuments( completableFutures ))
.thenAccept ( x -> finish( x ))
.exceptionally( x -> failed( x ));
private List<Documentable> failedList(final Throwable x) {
LOGGER.error("failedList", x);
final List<Documentable> metaData = new ArrayList<>();
return metaData;
}
private Void failed(final Throwable x) {
LOGGER.error("failed", x);
return null;
}
我认为可以接受
然而让我不安的代码是这样的:-
WWW_SITE_ONE("https://example.site.one/") {
@Override
public <T extends Documentable> CompletionStage<List<T>> contactEndpoit( final List<String> elements) {
LOGGER.info("WWW_SITE_ONE " + Thread.currentThread().getName());
final List<T> SITE_ONEs = new ArrayList<>();
for (final String element : elements) {
try {
final String json = Jsoup.connect(ENDPOINT.WWW_SITE_ONE.getBaseUrl() + element).ignoreContentType(true).ignoreHttpErrors(true).maxBodySize(0).timeout(60000).execute().body();
if (json.contains("errors")) {
continue;
}
final T SITE_ONE = OBJECT_READER_SITE_ONE.readValue(json);
SITE_ONEs.add(SITE_ONE);
}
catch( final Throwable e ) {
LOGGER.error("WWW_SITE_ONE failed", e);
throw new RuntimeException(e);
}
}
return CompletableFuture.supplyAsync(() -> SITE_ONEs);
}
},
WWW_SITE_TWO("https://example.site.two/") {
@Override
public <T extends Documentable> CompletionStage<List<T>> contactEndpoit(final List<String> elements) {
LOGGER.info("WWW_SITE_TWO " + Thread.currentThread().getName());
final List<T> SITE_TWOs = new ArrayList<>();
for (final String element : elements) {
try {
final String json = Jsoup.connect(ENDPOINT.WWW_SITE_TWO.getBaseUrl() + element).ignoreContentType(true).ignoreHttpErrors(true).maxBodySize(0).timeout(60000).execute().body();
if (json.equals("Resource not found.")) {
continue;
}
final T SITE_TWO = OBJECT_READER_SITE_TWO.readValue(json);
SITE_TWOs.add(SITE_TWO);
}
catch (final Throwable e) {
LOGGER.error("WWW_SITE_TWO failed", e);
throw new RuntimeException(e);
}
}
return CompletableFuture.supplyAsync(() -> SITE_TWOs);
}
},
WWW_SITE_THREE("https://example.site.three/") {
@Override
public <T extends Documentable> CompletionStage<List<T>> contactEndpoit(final List<String> elements) {
LOGGER.info("WWW_SITE_THREE " + Thread.currentThread().getName());
final List<T> SITE_THREEs = new ArrayList<>();
for (final String element : elements) {
try {
final String SITE_THREEJsonString = Jsoup
.connect( ENDPOINT.WWW_SITE_THREE.getBaseUrl() + element)
.ignoreContentType(true)
.ignoreHttpErrors(true)
.maxBodySize(0)
.timeout(60000)
.execute()
.body();
final SITE_THREE SITE_THREE_Json = OBJECT_READER_SITE_THREE.readValue(SITE_THREEJsonString);
final T SITE_THREE = (T) SITE_THREE_Json;
if (SITE_THREE_Json.getHitCount() > 0) {
SITE_THREEs.add(SITE_THREE);
}
}
catch (final Throwable e) {
LOGGER.error("WWW_SITE_THREE failed", e);
throw new RuntimeException(e);
}
}
return CompletableFuture.supplyAsync(() -> SITE_THREEs);
}
};
我在return
正在CompletableFuture.supplyAsync(() -> SITE_THREEs);
这是正确的方法吗?
或者这是否会启动另一个异步线程来简单地 return 我的列表<>?
顾名思义,supplyAsync
将执行异步操作,在后台线程中执行 Supplier
的 get()
方法,因此是 lambda 表达式的主体,不管它是多么微不足道。由于 supplyAsync
的实现无法检查 Supplier
封装的代码有多简单,因此它必须以这种方式工作。
而不是 CompletableFuture.supplyAsync(() -> SITE_THREEs)
,您应该使用 CompletableFuture.completedFuture(SITE_THREEs)
,其中 return 是一个已经完成的结果,因此不需要额外的操作。
如果方法仅 returns 完成阶段或抛出异常,您也可以将其更改为 return 结果值而不是 CompletionStage
并使用 thenApply
而不是 thenCompose
,简化您的代码 — 除非您想保留在该方法的未来版本中引入异步操作的选项。
我正在调查 Java 8 CompletableFutures
并阅读(并看到)我应该使用 thenCompose
而不是 thenApply
。
我已将我的代码转换为使用 thenCompose
但我感觉不正确。
这是我的控制代码...
final CompletableFuture<List<String>> extractor = get(htmlPageSource);
@SuppressWarnings("unchecked")
final CompletableFuture<List<Documentable>>[] completableFutures =
new CompletableFuture[ENDPOINT.EXTRACTABLES.size()];
int index = 0;
for( ENDPOINT endpoint : ENDPOINT.EXTRACTABLES ) {
final CompletableFuture<List<Documentable>> metaData =
extractor.thenComposeAsync(
s -> endpoint.contactEndpoit(s), executorService );
completableFutures[index++] = metaData.exceptionally(x -> failedList(x));
}
CompletableFuture
.allOf( completableFutures )
.thenComposeAsync( dummy -> combineDocuments( completableFutures ))
.thenAccept ( x -> finish( x ))
.exceptionally( x -> failed( x ));
private List<Documentable> failedList(final Throwable x) {
LOGGER.error("failedList", x);
final List<Documentable> metaData = new ArrayList<>();
return metaData;
}
private Void failed(final Throwable x) {
LOGGER.error("failed", x);
return null;
}
我认为可以接受
然而让我不安的代码是这样的:-
WWW_SITE_ONE("https://example.site.one/") {
@Override
public <T extends Documentable> CompletionStage<List<T>> contactEndpoit( final List<String> elements) {
LOGGER.info("WWW_SITE_ONE " + Thread.currentThread().getName());
final List<T> SITE_ONEs = new ArrayList<>();
for (final String element : elements) {
try {
final String json = Jsoup.connect(ENDPOINT.WWW_SITE_ONE.getBaseUrl() + element).ignoreContentType(true).ignoreHttpErrors(true).maxBodySize(0).timeout(60000).execute().body();
if (json.contains("errors")) {
continue;
}
final T SITE_ONE = OBJECT_READER_SITE_ONE.readValue(json);
SITE_ONEs.add(SITE_ONE);
}
catch( final Throwable e ) {
LOGGER.error("WWW_SITE_ONE failed", e);
throw new RuntimeException(e);
}
}
return CompletableFuture.supplyAsync(() -> SITE_ONEs);
}
},
WWW_SITE_TWO("https://example.site.two/") {
@Override
public <T extends Documentable> CompletionStage<List<T>> contactEndpoit(final List<String> elements) {
LOGGER.info("WWW_SITE_TWO " + Thread.currentThread().getName());
final List<T> SITE_TWOs = new ArrayList<>();
for (final String element : elements) {
try {
final String json = Jsoup.connect(ENDPOINT.WWW_SITE_TWO.getBaseUrl() + element).ignoreContentType(true).ignoreHttpErrors(true).maxBodySize(0).timeout(60000).execute().body();
if (json.equals("Resource not found.")) {
continue;
}
final T SITE_TWO = OBJECT_READER_SITE_TWO.readValue(json);
SITE_TWOs.add(SITE_TWO);
}
catch (final Throwable e) {
LOGGER.error("WWW_SITE_TWO failed", e);
throw new RuntimeException(e);
}
}
return CompletableFuture.supplyAsync(() -> SITE_TWOs);
}
},
WWW_SITE_THREE("https://example.site.three/") {
@Override
public <T extends Documentable> CompletionStage<List<T>> contactEndpoit(final List<String> elements) {
LOGGER.info("WWW_SITE_THREE " + Thread.currentThread().getName());
final List<T> SITE_THREEs = new ArrayList<>();
for (final String element : elements) {
try {
final String SITE_THREEJsonString = Jsoup
.connect( ENDPOINT.WWW_SITE_THREE.getBaseUrl() + element)
.ignoreContentType(true)
.ignoreHttpErrors(true)
.maxBodySize(0)
.timeout(60000)
.execute()
.body();
final SITE_THREE SITE_THREE_Json = OBJECT_READER_SITE_THREE.readValue(SITE_THREEJsonString);
final T SITE_THREE = (T) SITE_THREE_Json;
if (SITE_THREE_Json.getHitCount() > 0) {
SITE_THREEs.add(SITE_THREE);
}
}
catch (final Throwable e) {
LOGGER.error("WWW_SITE_THREE failed", e);
throw new RuntimeException(e);
}
}
return CompletableFuture.supplyAsync(() -> SITE_THREEs);
}
};
我在return
正在CompletableFuture.supplyAsync(() -> SITE_THREEs);
这是正确的方法吗?
或者这是否会启动另一个异步线程来简单地 return 我的列表<>?
顾名思义,supplyAsync
将执行异步操作,在后台线程中执行 Supplier
的 get()
方法,因此是 lambda 表达式的主体,不管它是多么微不足道。由于 supplyAsync
的实现无法检查 Supplier
封装的代码有多简单,因此它必须以这种方式工作。
而不是 CompletableFuture.supplyAsync(() -> SITE_THREEs)
,您应该使用 CompletableFuture.completedFuture(SITE_THREEs)
,其中 return 是一个已经完成的结果,因此不需要额外的操作。
如果方法仅 returns 完成阶段或抛出异常,您也可以将其更改为 return 结果值而不是 CompletionStage
并使用 thenApply
而不是 thenCompose
,简化您的代码 — 除非您想保留在该方法的未来版本中引入异步操作的选项。