我可以在 RichAsyncFunction 中编写同步代码吗
Can I write sync code in RichAsyncFunction
当我需要使用 I/O(查询数据库,调用第三个 API,...)时,我可以使用 RichAsyncFunction。但是我需要通过GG Sheet API: https://developers.google.com/sheets/api/quickstart/java与Google Sheet进行互动。这个 API 是同步的。我写了下面的代码片段:
public class SendGGSheetFunction extends RichAsyncFunction<Obj, String> {
@Override
public void asyncInvoke(Obj message, final ResultFuture<String> resultFuture) {
CompletableFuture.supplyAsync(() -> {
syncSendToGGSheet(message);
return "";
}).thenAccept((String result) -> {
resultFuture.complete(Collections.singleton(result));
});
}
}
但是我发现发给GG的消息Sheet很慢,好像是同步发送的
AsyncIO
用户执行的大部分代码最初是同步的。您只需要确保它实际上是在单独的线程中执行的。最常用的是(静态共享)ExecutorService
。
private class SendGGSheetFunction extends RichAsyncFunction<Obj, String> {
private transient ExecutorService executorService;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
executorService = Executors.newFixedThreadPool(30);
}
@Override
public void close() throws Exception {
super.close();
executorService.shutdownNow();
}
@Override
public void asyncInvoke(final Obj message, final ResultFuture<String> resultFuture) {
executorService.submit(() -> {
try {
resultFuture.complete(syncSendToGGSheet(message));
} catch (SQLException e) {
resultFuture.completeExceptionally(e);
}
});
}
}
以下是关于如何调整 AsyncIO 以增加吞吐量的一些注意事项:http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Async-IO-operator-tuning-micro-benchmarks-td35858.html
当我需要使用 I/O(查询数据库,调用第三个 API,...)时,我可以使用 RichAsyncFunction。但是我需要通过GG Sheet API: https://developers.google.com/sheets/api/quickstart/java与Google Sheet进行互动。这个 API 是同步的。我写了下面的代码片段:
public class SendGGSheetFunction extends RichAsyncFunction<Obj, String> {
@Override
public void asyncInvoke(Obj message, final ResultFuture<String> resultFuture) {
CompletableFuture.supplyAsync(() -> {
syncSendToGGSheet(message);
return "";
}).thenAccept((String result) -> {
resultFuture.complete(Collections.singleton(result));
});
}
}
但是我发现发给GG的消息Sheet很慢,好像是同步发送的
AsyncIO
用户执行的大部分代码最初是同步的。您只需要确保它实际上是在单独的线程中执行的。最常用的是(静态共享)ExecutorService
。
private class SendGGSheetFunction extends RichAsyncFunction<Obj, String> {
private transient ExecutorService executorService;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
executorService = Executors.newFixedThreadPool(30);
}
@Override
public void close() throws Exception {
super.close();
executorService.shutdownNow();
}
@Override
public void asyncInvoke(final Obj message, final ResultFuture<String> resultFuture) {
executorService.submit(() -> {
try {
resultFuture.complete(syncSendToGGSheet(message));
} catch (SQLException e) {
resultFuture.completeExceptionally(e);
}
});
}
}
以下是关于如何调整 AsyncIO 以增加吞吐量的一些注意事项:http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Async-IO-operator-tuning-micro-benchmarks-td35858.html