Spring 同时请求多个

Spring multiple requests simultaneously

这是在收到数据库中 post 的列表后发送 POST 请求以及每个 post 的 link 数据的代码。

在每个 link 请求 POST 后,从响应中提取 playerCount 并将其更新为每个 post。

我在这段代码中使用了Resttemplate,但是有一个问题耗时太长。

所以我想将此代码更改为立即发送请求并在所有请求完成后更新每个 post。

如何将此代码转换为我想要的代码?

我打算将这段代码作为计划任务使用。

    @Test
    @Transactional
    @Rollback(false)
    public void postToGraphql2() throws JsonProcessingException, JSONException {
        String URL = "https://gt-space-data.herokuapp.com/graphql";
        RestTemplate restTemplate = new RestTemplate();
        HttpHeaders headers = new HttpHeaders();
        headers.add("content-type", "application/json");

        long startTime = System.currentTimeMillis();

        List<Posts> list = postsRepository.findPostsByCategoryStringContaining("GATHERTOWN");
        String query = "query gameData($apikey:String!,$spaceid:String!,$spacename:String!){gameData(spaceData:{apiKey: $apikey, spaceIdNum: $spaceid, spaceName: $spacename}){playerCount,}}";
        String opertationName = "gameData";

        list.forEach(posts -> {
            String link = posts.getLink();
            if(link.contains("gather.town")){
                String spaceid = link.substring(link.indexOf("app/")+4,link.lastIndexOf("/"));
                String spacename = link.substring(link.lastIndexOf("/")+1, link.length());
                String variables = "{\"apikey\": \"QUNCVEQGILsqe5\",\"spaceid\": \""+spaceid+"\",\"spacename\" : \""+spacename+"\"}";

                try {
                    ResponseEntity<PlayerCountDto> response =
                            restTemplate.postForEntity(URL, new HttpEntity<>(createJsonQueries(query,opertationName,variables), headers), PlayerCountDto.class);
                    int playerCount = Objects.requireNonNull(response.getBody()).getData().getGameData().playerCount;
                    posts.setPlayerCount(playerCount);
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                }
            }
        });

        long stopTime = System.currentTimeMillis();
        long elapsedTime = stopTime - startTime;

        System.out.println(elapsedTime);
    }

一种方法是用 'Webclient' 替换 Resttemplate。 Webclient 是 Spring Webflux 的一部分,在 Spring 5.0 中引入。 Webclient 是异步的并且 non-blocking.

您可以从以下文档开始

https://docs.spring.io/spring-boot/docs/2.0.3.RELEASE/reference/html/boot-features-webclient.html

我将代码 Resttemplate 更改为 webclient,但在使用 subscribe() 时无法正常工作。

使用 .block() 时效果很好,但耗时太长。

如何让 .subscribe() 工作?

    @Test
    @Transactional
    @Rollback(false)
    public void postToGraphql3() throws JsonProcessingException, JSONException {
        String URL = "https://gt-space-data.herokuapp.com/graphql";

        WebClient webClient = webClientBuilder.baseUrl(URL).defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE).build();

        long startTime = System.currentTimeMillis();

        List<Posts> list = postsRepository.findPostsByCategoryStringContaining("GATHERTOWN");
        String query = "query gameData($apikey:String!,$spaceid:String!,$spacename:String!){gameData(spaceData:{apiKey: $apikey, spaceIdNum: $spaceid, spaceName: $spacename}){playerCount,}}";
        String opertationName = "gameData";

        list.forEach(posts -> {
            String link = posts.getLink();
            String graphqlQuery = null;
            if(link.contains("gather.town")){
                String spaceid = link.substring(link.indexOf("app/")+4,link.lastIndexOf("/"));
                String spacename = link.substring(link.lastIndexOf("/")+1, link.length());
                String variables = "{\"apikey\": \"QUNCVEQGILsqeXR5\",\"spaceid\": \""+spaceid+"\",\"spacename\" : \""+spacename+"\"}";

                try {
                    graphqlQuery = createJsonQueries(query,opertationName,variables);
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                }
            }

            //not working when using .subscribe()
            Mono<PlayerCountDto> playerCountDtoMono = webClient.post().bodyValue(graphqlQuery)
                    .retrieve()
                    .bodyToMono(PlayerCountDto.class);

            playerCountDtoMono.doOnSuccess(
                    playerCountDto -> {
                        System.out.println(playerCountDto.getData().gameData.getPlayerCount());
                    }
            ).subscribe();
        });

        long stopTime = System.currentTimeMillis();
        long elapsedTime = stopTime - startTime;

        System.out.println(elapsedTime);

    }

日志

2022-04-24 12:15:08.101  INFO 29112 --- [    Test worker] reactor.Mono.PeekTerminal.1              : | onSubscribe([Fuseable] MonoPeekTerminal.MonoTerminalPeekSubscriber)
2022-04-24 12:15:08.102  INFO 29112 --- [    Test worker] reactor.Mono.PeekTerminal.1              : | request(unbounded)
2022-04-24 12:15:08.856  INFO 29112 --- [    Test worker] reactor.Mono.PeekTerminal.2              : | onSubscribe([Fuseable] MonoPeekTerminal.MonoTerminalPeekSubscriber)
2022-04-24 12:15:08.856  INFO 29112 --- [    Test worker] reactor.Mono.PeekTerminal.2              : | request(unbounded)
2022-04-24 12:15:08.859  INFO 29112 --- [    Test worker] reactor.Mono.PeekTerminal.3              : | onSubscribe([Fuseable] MonoPeekTerminal.MonoTerminalPeekSubscriber)
2022-04-24 12:15:08.859  INFO 29112 --- [    Test worker] reactor.Mono.PeekTerminal.3              : | request(unbounded)
2022-04-24 12:15:08.861  INFO 29112 --- [    Test worker] reactor.Mono.PeekTerminal.4              : | onSubscribe([Fuseable] MonoPeekTerminal.MonoTerminalPeekSubscriber)
2022-04-24 12:15:08.862  INFO 29112 --- [    Test worker] reactor.Mono.PeekTerminal.4              : | request(unbounded)
2022-04-24 12:15:08.867  INFO 29112 --- [    Test worker] reactor.Mono.PeekTerminal.5              : | onSubscribe([Fuseable] MonoPeekTerminal.MonoTerminalPeekSubscriber)
2022-04-24 12:15:08.867  INFO 29112 --- [    Test worker] reactor.Mono.PeekTerminal.5              : | request(unbounded)
2022-04-24 12:15:08.869  INFO 29112 --- [    Test worker] reactor.Mono.PeekTerminal.6              : | onSubscribe([Fuseable] MonoPeekTerminal.MonoTerminalPeekSubscriber)
2022-04-24 12:15:08.870  INFO 29112 --- [    Test worker] reactor.Mono.PeekTerminal.6              : | request(unbounded)
2022-04-24 12:15:08.872  INFO 29112 --- [    Test worker] reactor.Mono.PeekTerminal.7              : | onSubscribe([Fuseable] MonoPeekTerminal.MonoTerminalPeekSubscriber)
2022-04-24 12:15:08.873  INFO 29112 --- [    Test worker] reactor.Mono.PeekTerminal.7              : | request(unbounded)
2022-04-24 12:15:08.876  INFO 29112 --- [    Test worker] reactor.Mono.PeekTerminal.8              : | onSubscribe([Fuseable] MonoPeekTerminal.MonoTerminalPeekSubscriber)
2022-04-24 12:15:08.877  INFO 29112 --- [    Test worker] reactor.Mono.PeekTerminal.8              : | request(unbounded)
2022-04-24 12:15:08.880  INFO 29112 --- [    Test worker] reactor.Mono.PeekTerminal.9              : | onSubscribe([Fuseable] MonoPeekTerminal.MonoTerminalPeekSubscriber)
2022-04-24 12:15:08.880  INFO 29112 --- [    Test worker] reactor.Mono.PeekTerminal.9              : | request(unbounded)
2022-04-24 12:15:08.882  INFO 29112 --- [    Test worker] reactor.Mono.PeekTerminal.10             : | onSubscribe([Fuseable] MonoPeekTerminal.MonoTerminalPeekSubscriber)
2022-04-24 12:15:08.882  INFO 29112 --- [    Test worker] reactor.Mono.PeekTerminal.10             : | request(unbounded)
2022-04-24 12:15:08.884  INFO 29112 --- [    Test worker] reactor.Mono.PeekTerminal.11             : | onSubscribe([Fuseable] MonoPeekTerminal.MonoTerminalPeekSubscriber)
2022-04-24 12:15:08.884  INFO 29112 --- [    Test worker] reactor.Mono.PeekTerminal.11             : | request(unbounded)
2022-04-24 12:15:08.887  INFO 29112 --- [    Test worker] reactor.Mono.PeekTerminal.12             : | onSubscribe([Fuseable] MonoPeekTerminal.MonoTerminalPeekSubscriber)
2022-04-24 12:15:08.887  INFO 29112 --- [    Test worker] reactor.Mono.PeekTerminal.12             : | request(unbounded)