私有以太坊区块链中循环构造的 CompletableFuture
CompleteableFuture in a loop contruct in a private Ethereum Blockchain
我有一个私人以太坊区块链,上面有 5 台机器在上面挖矿。区块链的大小 [块数] 目前为 300。处理在后端完成 Java.
我需要运行以异步方式构建以下循环。循环的瓶颈是在执行以下命令期间:
EthBlock eb = web3.ethGetBlockByNumber(new DefaultBlockParameterNumber(BigInteger.valueOf(i)), true).send();
该命令还可以 return 一个 Completablefuture<EthBlock>
对象,方法是用这里给出的 supplyAsync()
结束它 https://github.com/web3j/web3j#start-sending-requests 只需调用 supplyAync().get()
就可以删除并行性方面并使它同步运行。
public void businessLogic() throws Exception {
recentBlocks = new ArrayList<EthBlock.Block>();
for (long i = 1; i <= 300000; i++) {
EthBlock eb = web3.ethGetBlockByNumber(new DefaultBlockParameterNumber(BigInteger.valueOf(i)), true).send();
if (eb == null || eb.getBlock() == null) {
continue;
}
EthBlock.Block block = eb.getBlock();
recentBlocks.add(block);
}
}
我无法理解将代码转换为 CompleteableFuture 可以运行的方式的机制。目标是 'group' 将对 web.ethGetBlockNumber(...).supplyAync()
的多次调用合并到一个集合中,并一次调用它们来更新一个数组,该数组将被 EthBlock
个对象填充,即 recentBlocks
。
这是我想出的:
public void businessLogic() throws Exception {
recentBlocks = new ArrayList<EthBlock.Block>();
List<CompleteableFuture> compFutures = new ArrayList<>();
for (long i = 0, i <= 300000, i++){
CompleteableFuture<EthBlock> compFuture = eb3.ethGetBlockByNumber(new DefaultBlockParameterNumber(BigInteger.valueOf(i)), true).sendAsync();
compFuture.thenAcceptAsync(eb -> // Doesn't look right
EthBlock.Block block = eb.getBlock();
recentBlock.add(block);)
compFutures.add(compFuture);
}
CompleteableFuture.allOf(compFutures).get();
}
实施 IntStream
long start = System.nanoTime();
recentBlocks = IntStream.rangeClosed(0, 300_000)
.parallel()
.mapToObj(i -> {
try {
System.out.println("Current Thread -> " + Thread.currentThread());
return web3.ethGetBlockByNumber(new DefaultBlockParameterNumber(BigInteger.valueOf(i)), true).send();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return null;
})
.filter(Objects::nonNull)
.map(EthBlock::getBlock)
.filter(Objects::nonNull)
.collect(Collectors.toList());
long stop = System.nanoTime();
System.out.println("Time Elapsed: " + TimeUnit.MICROSECONDS.convert(stop-start, TimeUnit.NANOSECONDS));
CompletableFuture 包含 get 的覆盖:
get(long timeout, TimeUnit unit)
。如果在特定时间内没有return,您可以通过使获取超时来使用它进行轮询。
假设结果 List
的顺序不重要,您也许可以从并行流中获益而不是依赖 CompletableFuture
:
IntStream.rangeClosed(0, 300_000)
.parallel()
.mapToObj(i -> web3.ethGetBlockByNumber(new DefaultBlockParameterNumber(BigInteger.valueOf(i)), true).send())
.filter(Objects::nonNull)
.map(EthBlock::getBlock)
.filter(Objects::nonNull)
.collect(Collectors.toList());
因为您说这没有帮助,让我们尝试使用缓存线程池的 ExecutorService
:
List<EthBlock.Block> blocks = Collections.synchronizedList(new ArrayList<>(300_000));
ExecutorService service = Executors.newCachedThreadPool();
for (int i = 0; i <= 300_000; i++) {
BigInteger number = BigInteger.valueOf(i);
service.execute(() -> {
EthBlock eb = web3.ethGetBlockByNumber(new DefaultBlockParameterNumber(number), true).send();
if (eb == null) {
return;
}
EthBlock.Block block = eb.getBlock();
if (block != null) {
blocks.add(block);
}
});
}
我有一个私人以太坊区块链,上面有 5 台机器在上面挖矿。区块链的大小 [块数] 目前为 300。处理在后端完成 Java.
我需要运行以异步方式构建以下循环。循环的瓶颈是在执行以下命令期间:
EthBlock eb = web3.ethGetBlockByNumber(new DefaultBlockParameterNumber(BigInteger.valueOf(i)), true).send();
该命令还可以 return 一个 Completablefuture<EthBlock>
对象,方法是用这里给出的 supplyAsync()
结束它 https://github.com/web3j/web3j#start-sending-requests 只需调用 supplyAync().get()
就可以删除并行性方面并使它同步运行。
public void businessLogic() throws Exception {
recentBlocks = new ArrayList<EthBlock.Block>();
for (long i = 1; i <= 300000; i++) {
EthBlock eb = web3.ethGetBlockByNumber(new DefaultBlockParameterNumber(BigInteger.valueOf(i)), true).send();
if (eb == null || eb.getBlock() == null) {
continue;
}
EthBlock.Block block = eb.getBlock();
recentBlocks.add(block);
}
}
我无法理解将代码转换为 CompleteableFuture 可以运行的方式的机制。目标是 'group' 将对 web.ethGetBlockNumber(...).supplyAync()
的多次调用合并到一个集合中,并一次调用它们来更新一个数组,该数组将被 EthBlock
个对象填充,即 recentBlocks
。
这是我想出的:
public void businessLogic() throws Exception {
recentBlocks = new ArrayList<EthBlock.Block>();
List<CompleteableFuture> compFutures = new ArrayList<>();
for (long i = 0, i <= 300000, i++){
CompleteableFuture<EthBlock> compFuture = eb3.ethGetBlockByNumber(new DefaultBlockParameterNumber(BigInteger.valueOf(i)), true).sendAsync();
compFuture.thenAcceptAsync(eb -> // Doesn't look right
EthBlock.Block block = eb.getBlock();
recentBlock.add(block);)
compFutures.add(compFuture);
}
CompleteableFuture.allOf(compFutures).get();
}
实施 IntStream
long start = System.nanoTime();
recentBlocks = IntStream.rangeClosed(0, 300_000)
.parallel()
.mapToObj(i -> {
try {
System.out.println("Current Thread -> " + Thread.currentThread());
return web3.ethGetBlockByNumber(new DefaultBlockParameterNumber(BigInteger.valueOf(i)), true).send();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return null;
})
.filter(Objects::nonNull)
.map(EthBlock::getBlock)
.filter(Objects::nonNull)
.collect(Collectors.toList());
long stop = System.nanoTime();
System.out.println("Time Elapsed: " + TimeUnit.MICROSECONDS.convert(stop-start, TimeUnit.NANOSECONDS));
CompletableFuture 包含 get 的覆盖:
get(long timeout, TimeUnit unit)
。如果在特定时间内没有return,您可以通过使获取超时来使用它进行轮询。
假设结果 List
的顺序不重要,您也许可以从并行流中获益而不是依赖 CompletableFuture
:
IntStream.rangeClosed(0, 300_000)
.parallel()
.mapToObj(i -> web3.ethGetBlockByNumber(new DefaultBlockParameterNumber(BigInteger.valueOf(i)), true).send())
.filter(Objects::nonNull)
.map(EthBlock::getBlock)
.filter(Objects::nonNull)
.collect(Collectors.toList());
因为您说这没有帮助,让我们尝试使用缓存线程池的 ExecutorService
:
List<EthBlock.Block> blocks = Collections.synchronizedList(new ArrayList<>(300_000));
ExecutorService service = Executors.newCachedThreadPool();
for (int i = 0; i <= 300_000; i++) {
BigInteger number = BigInteger.valueOf(i);
service.execute(() -> {
EthBlock eb = web3.ethGetBlockByNumber(new DefaultBlockParameterNumber(number), true).send();
if (eb == null) {
return;
}
EthBlock.Block block = eb.getBlock();
if (block != null) {
blocks.add(block);
}
});
}