CompletionStage.thenCompose 没有串行执行
CompletionStage.thenCompose not executing serially
我正在尝试使用 java 8 个 CompletionStages 来串行执行 2 个异步方法,以便在第一个失败时不执行第二个。但是当我调用 thenCompose 时,传入的函数似乎在前一个函数完成之前就开始了(例如:这两个函数错误地并行执行。代码如下:
public CompletionStage<Graph> create(Payload payload) {
CompletionStage<BlobInfo> fileFuture = createFile(payload);
CompletionStage<Entity> metadataFuture = createMetadata(payload);
return fileFuture
.thenCompose(ignore -> metadataFuture)
.thenApply(entity ->
buildFromEntity(objectMapper, entity));
}
public CompletionStage<BlobInfo> createFile(Payload payload) {
return CompletableFuture.supplyAsync(() -> {
try {
return
storage.create(
BlobInfo
.newBuilder(payload.bucket, payload.name)
.build(),
payload.data.getBytes());
} catch (StorageException e) {
LOG.error("Failed to write to storage: " + e);
throw new RequestHandlerException(StatusCode.SERVER_ERROR,
"Failed to write to storage.");
}
});
}
public CompletionStage<Entity> createMetadata(Payload payload) {
return CompletableFuture.supplyAsync(() -> createSync(payload));
}
private Entity createMetadataSync(Payload payload) {
Key key = keyFactory.newKey(payload.id);
Entity.Builder entityBuilder = GraphPayload.buildEntityFromGraph(payload, key);
Entity entity = entityBuilder.build();
LOG.error("Metadata.createSync");
try {
datastore.add(entity);
} catch (DatastoreException e) {
LOG.error("Failed to write initial metadata: " + e);
throw new RequestHandlerException(StatusCode.SERVER_ERROR,
"Failed to write initial metadata.");
}
return entity;
}
输出:
16:57:47.530 [ForkJoinPool.commonPool-worker-3] ERROR com.spotify.nfgraphstore.store.FileStore - CreateFile
16:57:47.530 [ForkJoinPool.commonPool-worker-2] ERROR com.spotify.nfgraphstore.store.MetadataStore - Metadata.createSync
16:57:47.530 [ForkJoinPool.commonPool-worker-3] ERROR com.spotify.nfgraphstore.store.FileStore - Failed to write initial graph to storage: com.google.cloud.storage.StorageException: X
记录的输出表明 Metadata.createSync 在存储异常被抛出之前被执行。如果对文件存储 DB 的写入失败,该结论也由一项测试(未显示)得出,该测试应该显示与元数据 DB 的零交互。该测试有时会失败,表明存在竞争条件。
所以我认为 thenCompose 不能保证串行执行。但是我在 java 文档中阅读的所有内容都表明执行应该是串行的:https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletionStage.html#thenCompose-java.util.function.Function-
有谁知道为什么不能保证串行执行,或者推荐其他可能更符合我预期的功能?
调用 createMetadata
会立即启动任务,因为它不是作为传递给 thenCompose
.
的 lambda 表达式的一部分调用的
也许你打算这样做:
.thenCompose(ignore -> createMetadata(payload))
我正在尝试使用 java 8 个 CompletionStages 来串行执行 2 个异步方法,以便在第一个失败时不执行第二个。但是当我调用 thenCompose 时,传入的函数似乎在前一个函数完成之前就开始了(例如:这两个函数错误地并行执行。代码如下:
public CompletionStage<Graph> create(Payload payload) {
CompletionStage<BlobInfo> fileFuture = createFile(payload);
CompletionStage<Entity> metadataFuture = createMetadata(payload);
return fileFuture
.thenCompose(ignore -> metadataFuture)
.thenApply(entity ->
buildFromEntity(objectMapper, entity));
}
public CompletionStage<BlobInfo> createFile(Payload payload) {
return CompletableFuture.supplyAsync(() -> {
try {
return
storage.create(
BlobInfo
.newBuilder(payload.bucket, payload.name)
.build(),
payload.data.getBytes());
} catch (StorageException e) {
LOG.error("Failed to write to storage: " + e);
throw new RequestHandlerException(StatusCode.SERVER_ERROR,
"Failed to write to storage.");
}
});
}
public CompletionStage<Entity> createMetadata(Payload payload) {
return CompletableFuture.supplyAsync(() -> createSync(payload));
}
private Entity createMetadataSync(Payload payload) {
Key key = keyFactory.newKey(payload.id);
Entity.Builder entityBuilder = GraphPayload.buildEntityFromGraph(payload, key);
Entity entity = entityBuilder.build();
LOG.error("Metadata.createSync");
try {
datastore.add(entity);
} catch (DatastoreException e) {
LOG.error("Failed to write initial metadata: " + e);
throw new RequestHandlerException(StatusCode.SERVER_ERROR,
"Failed to write initial metadata.");
}
return entity;
}
输出:
16:57:47.530 [ForkJoinPool.commonPool-worker-3] ERROR com.spotify.nfgraphstore.store.FileStore - CreateFile
16:57:47.530 [ForkJoinPool.commonPool-worker-2] ERROR com.spotify.nfgraphstore.store.MetadataStore - Metadata.createSync
16:57:47.530 [ForkJoinPool.commonPool-worker-3] ERROR com.spotify.nfgraphstore.store.FileStore - Failed to write initial graph to storage: com.google.cloud.storage.StorageException: X
记录的输出表明 Metadata.createSync 在存储异常被抛出之前被执行。如果对文件存储 DB 的写入失败,该结论也由一项测试(未显示)得出,该测试应该显示与元数据 DB 的零交互。该测试有时会失败,表明存在竞争条件。
所以我认为 thenCompose 不能保证串行执行。但是我在 java 文档中阅读的所有内容都表明执行应该是串行的:https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletionStage.html#thenCompose-java.util.function.Function-
有谁知道为什么不能保证串行执行,或者推荐其他可能更符合我预期的功能?
调用 createMetadata
会立即启动任务,因为它不是作为传递给 thenCompose
.
也许你打算这样做:
.thenCompose(ignore -> createMetadata(payload))