使用 AsyncRestTemplate 多次制作 API 并等待所有完成

Make API multiple times with AsyncRestTemplate and wait for all to complete

我必须使用不同参数多次使用 RestTemplate 进行 Rest API 调用。 API 相同,但它是正在更改的参数。次数也是可变的。我想使用 AsyncRestTemplate 但我的主线程应该等到所有 API 调用都已成功完成。我还想处理每个 API 调用返回的响应。目前我正在使用 RestTemplate。基本形式如下。

List<String> listOfResponses = new ArrayList<String>();
for (Integer studentId : studentIdsList) {
    String respBody;
    try {
        ResponseEntity<String> responseEntity = restTemplate.exchange(url, method, requestEntity, String.class);
    } catch (Exception ex) {
        throw new ApplicationException("Exception while making Rest call.", ex);
    }
    respBody = requestEntity.getBody();
    listOfResponses.add(respBody);          
}

在这种情况下如何实现 AsyncRestTemplate?

使用 AsyncRestTemplate(或任何异步 API,事实上)的主要思想是在第一时间发送所有请求,保留相应的期货,然后处理所有响应第二次。您可以简单地使用 2 个循环执行此操作:

List<ListenableFuture<ResponseEntity<String>>> responseFutures = new ArrayList<>();
for (Integer studentId : studentIdsList) {
    // FIXME studentId is not used
    ListenableFuture<ResponseEntity<String>> responseEntityFuture = restTemplate.exchange(url, method, requestEntity, String.class);
    responseFutures.add(responseEntityFuture);
}
// now all requests were send, so we can process the responses
List<String> listOfResponses = new ArrayList<>();
for (ListenableFuture<ResponseEntity<String>> future: responseFutures) {
    try {
        String respBody = future.get().getBody();
        listOfResponses.add(respBody);
    } catch (Exception ex) {
        throw new ApplicationException("Exception while making Rest call.", ex);
    }
}

注意:如果您需要将响应与原始请求配对,您可以将未来列表替换为映射或请求+响应对象列表。

我还注意到您的问题中没有使用 studentId

您可以使用 Java 8 Stream API,如果这对您可行:

List<String> listOfResponses = studentIdsList.stream()
    .parrallel()
    .map({studentId ->
        ResponseEntity<String> responseEntity = restTemplate.exchange(url, method, studentId, String.class);
        return responseEntity.getBody();
    })
    .collect(Collectors.toList());

这段代码基本上会执行两件事:

  1. 并行执行请求;
  2. 将请求的结果收集到列表中。

更新:同意@Didier L - 当您需要执行大量请求时,此解决方案可能无法正常工作。这是一个更新版本:

List<String> listOfResponses  = studentIdsList.stream()
                .map(studentId -> asyncRestTemplate.exchange(url, method, studentId, String.class)
                .collect(Collectors.toList()).stream()
                .map(this::retrieveResult)
                .collect(Collectors.toList());

    /**
     * Retrieves results of each request by blocking the main thread. Note that the actual request was performed on the previous step when
     * calling asyncRestTemplate.exchange(url, method, studentId, String.class)
     */
    private String retrieveResult(ListenableFuture<ResponseEntity<String>> listenableFuture) {
        try {
            return listenableFuture.get().getBody();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

这是我想建议的另一个解决方案,它使用 Spring 的 RestTemplate 而不是 AsyncRestTemplate。它还使用 Java 8 CompletableFuture.

public void sendRequestsAsync(List<Integer> studentList) {
    List<CompletableFuture<Void>> completableFutures = new ArrayList<>(studentList.size()); //List to hold all the completable futures
    List<String> responses = new ArrayList<>(); //List for responses
    ExecutorService yourOwnExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    for (Integer studentId : studentList) { //Iterate student list
        CompletableFuture<Void> requestCompletableFuture = CompletableFuture
                .supplyAsync(
                        () -> restTemplate.exchange("URL/" + studentId, HttpMethod.GET, null, String.class),
                        yourOwnExecutor
                )//Supply the task you wanna run, in your case http request
                .thenApply((responseEntity) -> {
                    responses.add(responseEntity.getBody());
                    return responseEntity;
                })//now you can add response body to responses
                .thenAccept((responseEntity) -> {
                    doSomeFinalStuffWithResponse(responseEntity);
                })//here you can do more stuff with responseEntity (if you need to)
                .exceptionally(ex -> {
                    System.out.println(ex);
                    return null;
                });//do something here if an exception occurs in the execution;

        completableFutures.add(requestCompletableFuture);
    }

    try {
        CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[completableFutures.size()])).get(); //Now block till all of them are executed by building another completablefuture with others.
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
}

我更喜欢这个解决方案,因为我可以链接尽可能多的业务逻辑,而不必依赖 Spring 的异步发送内部机制。 显然你可以进一步清理代码,我现在还没有太关注它。