私有以太坊区块链中循环构造的 CompletableFuture

CompleteableFuture in a loop contruct in a private Ethereum Blockchain

我有一个私人以太坊区块链,上面有 5 台机器在上面挖矿。区块链的大小 [块数] 目前为 300。处理在后端完成 Java.

我需要运行以异步方式构建以下循环。循环的瓶颈是在执行以下命令期间:

EthBlock eb = web3.ethGetBlockByNumber(new DefaultBlockParameterNumber(BigInteger.valueOf(i)), true).send();

该命令还可以 return 一个 Completablefuture<EthBlock> 对象,方法是用这里给出的 supplyAsync() 结束它 https://github.com/web3j/web3j#start-sending-requests 只需调用 supplyAync().get() 就可以删除并行性方面并使它同步运行。

public void businessLogic() throws Exception {
        recentBlocks = new ArrayList<EthBlock.Block>();
        for (long i = 1; i <= 300000; i++) {
            EthBlock eb = web3.ethGetBlockByNumber(new DefaultBlockParameterNumber(BigInteger.valueOf(i)), true).send();
            if (eb == null || eb.getBlock() == null) {
                continue;
            }
            EthBlock.Block block = eb.getBlock();
            recentBlocks.add(block);
        }
    }

我无法理解将代码转换为 CompleteableFuture 可以运行的方式的机制。目标是 'group' 将对 web.ethGetBlockNumber(...).supplyAync() 的多次调用合并到一个集合中,并一次调用它们来更新一个数组,该数组将被 EthBlock 个对象填充,即 recentBlocks

这是我想出的:

public void businessLogic() throws Exception {
    recentBlocks = new ArrayList<EthBlock.Block>();
    List<CompleteableFuture> compFutures = new ArrayList<>();
    for (long i = 0, i <= 300000, i++){
        CompleteableFuture<EthBlock> compFuture = eb3.ethGetBlockByNumber(new DefaultBlockParameterNumber(BigInteger.valueOf(i)), true).sendAsync();
        compFuture.thenAcceptAsync(eb -> // Doesn't look right
        EthBlock.Block block = eb.getBlock();
        recentBlock.add(block);)
        compFutures.add(compFuture);        
    }
    CompleteableFuture.allOf(compFutures).get();
}

实施 IntStream

    long start = System.nanoTime();
    recentBlocks = IntStream.rangeClosed(0, 300_000)
             .parallel()
             .mapToObj(i -> {
                try {
                    System.out.println("Current Thread -> " + Thread.currentThread());
                    return web3.ethGetBlockByNumber(new DefaultBlockParameterNumber(BigInteger.valueOf(i)), true).send();
                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                return null;
            })
             .filter(Objects::nonNull)
             .map(EthBlock::getBlock)
             .filter(Objects::nonNull)
             .collect(Collectors.toList());
    long stop = System.nanoTime();
    System.out.println("Time Elapsed: " + TimeUnit.MICROSECONDS.convert(stop-start, TimeUnit.NANOSECONDS));

CompletableFuture 包含 get 的覆盖: get(long timeout, TimeUnit unit)。如果在特定时间内没有return,您可以通过使获取超时来使用它进行轮询。

假设结果 List 的顺序不重要,您也许可以从并行流中获益而不是依赖 CompletableFuture

IntStream.rangeClosed(0, 300_000)
         .parallel()
         .mapToObj(i -> web3.ethGetBlockByNumber(new DefaultBlockParameterNumber(BigInteger.valueOf(i)), true).send())
         .filter(Objects::nonNull)
         .map(EthBlock::getBlock)
         .filter(Objects::nonNull)
         .collect(Collectors.toList());

因为您说这没有帮助,让我们尝试使用缓存线程池的 ExecutorService

List<EthBlock.Block> blocks = Collections.synchronizedList(new ArrayList<>(300_000));

ExecutorService service = Executors.newCachedThreadPool();

for (int i = 0; i <= 300_000; i++) {
    BigInteger number = BigInteger.valueOf(i);

    service.execute(() -> {
        EthBlock eb = web3.ethGetBlockByNumber(new DefaultBlockParameterNumber(number), true).send();

        if (eb == null) {
            return;
        }

        EthBlock.Block block = eb.getBlock();

        if (block != null) {
            blocks.add(block);
        }
    });
}