Spring 同时请求多个
Spring multiple requests simultaneously
这是在收到数据库中 post 的列表后发送 POST 请求以及每个 post 的 link 数据的代码。
在每个 link 请求 POST 后,从响应中提取 playerCount 并将其更新为每个 post。
我在这段代码中使用了Resttemplate,但是有一个问题耗时太长。
所以我想将此代码更改为立即发送请求并在所有请求完成后更新每个 post。
如何将此代码转换为我想要的代码?
我打算将这段代码作为计划任务使用。
@Test
@Transactional
@Rollback(false)
public void postToGraphql2() throws JsonProcessingException, JSONException {
String URL = "https://gt-space-data.herokuapp.com/graphql";
RestTemplate restTemplate = new RestTemplate();
HttpHeaders headers = new HttpHeaders();
headers.add("content-type", "application/json");
long startTime = System.currentTimeMillis();
List<Posts> list = postsRepository.findPostsByCategoryStringContaining("GATHERTOWN");
String query = "query gameData($apikey:String!,$spaceid:String!,$spacename:String!){gameData(spaceData:{apiKey: $apikey, spaceIdNum: $spaceid, spaceName: $spacename}){playerCount,}}";
String opertationName = "gameData";
list.forEach(posts -> {
String link = posts.getLink();
if(link.contains("gather.town")){
String spaceid = link.substring(link.indexOf("app/")+4,link.lastIndexOf("/"));
String spacename = link.substring(link.lastIndexOf("/")+1, link.length());
String variables = "{\"apikey\": \"QUNCVEQGILsqe5\",\"spaceid\": \""+spaceid+"\",\"spacename\" : \""+spacename+"\"}";
try {
ResponseEntity<PlayerCountDto> response =
restTemplate.postForEntity(URL, new HttpEntity<>(createJsonQueries(query,opertationName,variables), headers), PlayerCountDto.class);
int playerCount = Objects.requireNonNull(response.getBody()).getData().getGameData().playerCount;
posts.setPlayerCount(playerCount);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
});
long stopTime = System.currentTimeMillis();
long elapsedTime = stopTime - startTime;
System.out.println(elapsedTime);
}
一种方法是用 'Webclient' 替换 Resttemplate。 Webclient 是 Spring Webflux 的一部分,在 Spring 5.0 中引入。
Webclient 是异步的并且 non-blocking.
您可以从以下文档开始
https://docs.spring.io/spring-boot/docs/2.0.3.RELEASE/reference/html/boot-features-webclient.html
我将代码 Resttemplate 更改为 webclient,但在使用 subscribe() 时无法正常工作。
使用 .block() 时效果很好,但耗时太长。
如何让 .subscribe() 工作?
@Test
@Transactional
@Rollback(false)
public void postToGraphql3() throws JsonProcessingException, JSONException {
String URL = "https://gt-space-data.herokuapp.com/graphql";
WebClient webClient = webClientBuilder.baseUrl(URL).defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE).build();
long startTime = System.currentTimeMillis();
List<Posts> list = postsRepository.findPostsByCategoryStringContaining("GATHERTOWN");
String query = "query gameData($apikey:String!,$spaceid:String!,$spacename:String!){gameData(spaceData:{apiKey: $apikey, spaceIdNum: $spaceid, spaceName: $spacename}){playerCount,}}";
String opertationName = "gameData";
list.forEach(posts -> {
String link = posts.getLink();
String graphqlQuery = null;
if(link.contains("gather.town")){
String spaceid = link.substring(link.indexOf("app/")+4,link.lastIndexOf("/"));
String spacename = link.substring(link.lastIndexOf("/")+1, link.length());
String variables = "{\"apikey\": \"QUNCVEQGILsqeXR5\",\"spaceid\": \""+spaceid+"\",\"spacename\" : \""+spacename+"\"}";
try {
graphqlQuery = createJsonQueries(query,opertationName,variables);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
//not working when using .subscribe()
Mono<PlayerCountDto> playerCountDtoMono = webClient.post().bodyValue(graphqlQuery)
.retrieve()
.bodyToMono(PlayerCountDto.class);
playerCountDtoMono.doOnSuccess(
playerCountDto -> {
System.out.println(playerCountDto.getData().gameData.getPlayerCount());
}
).subscribe();
});
long stopTime = System.currentTimeMillis();
long elapsedTime = stopTime - startTime;
System.out.println(elapsedTime);
}
日志
2022-04-24 12:15:08.101 INFO 29112 --- [ Test worker] reactor.Mono.PeekTerminal.1 : | onSubscribe([Fuseable] MonoPeekTerminal.MonoTerminalPeekSubscriber)
2022-04-24 12:15:08.102 INFO 29112 --- [ Test worker] reactor.Mono.PeekTerminal.1 : | request(unbounded)
2022-04-24 12:15:08.856 INFO 29112 --- [ Test worker] reactor.Mono.PeekTerminal.2 : | onSubscribe([Fuseable] MonoPeekTerminal.MonoTerminalPeekSubscriber)
2022-04-24 12:15:08.856 INFO 29112 --- [ Test worker] reactor.Mono.PeekTerminal.2 : | request(unbounded)
2022-04-24 12:15:08.859 INFO 29112 --- [ Test worker] reactor.Mono.PeekTerminal.3 : | onSubscribe([Fuseable] MonoPeekTerminal.MonoTerminalPeekSubscriber)
2022-04-24 12:15:08.859 INFO 29112 --- [ Test worker] reactor.Mono.PeekTerminal.3 : | request(unbounded)
2022-04-24 12:15:08.861 INFO 29112 --- [ Test worker] reactor.Mono.PeekTerminal.4 : | onSubscribe([Fuseable] MonoPeekTerminal.MonoTerminalPeekSubscriber)
2022-04-24 12:15:08.862 INFO 29112 --- [ Test worker] reactor.Mono.PeekTerminal.4 : | request(unbounded)
2022-04-24 12:15:08.867 INFO 29112 --- [ Test worker] reactor.Mono.PeekTerminal.5 : | onSubscribe([Fuseable] MonoPeekTerminal.MonoTerminalPeekSubscriber)
2022-04-24 12:15:08.867 INFO 29112 --- [ Test worker] reactor.Mono.PeekTerminal.5 : | request(unbounded)
2022-04-24 12:15:08.869 INFO 29112 --- [ Test worker] reactor.Mono.PeekTerminal.6 : | onSubscribe([Fuseable] MonoPeekTerminal.MonoTerminalPeekSubscriber)
2022-04-24 12:15:08.870 INFO 29112 --- [ Test worker] reactor.Mono.PeekTerminal.6 : | request(unbounded)
2022-04-24 12:15:08.872 INFO 29112 --- [ Test worker] reactor.Mono.PeekTerminal.7 : | onSubscribe([Fuseable] MonoPeekTerminal.MonoTerminalPeekSubscriber)
2022-04-24 12:15:08.873 INFO 29112 --- [ Test worker] reactor.Mono.PeekTerminal.7 : | request(unbounded)
2022-04-24 12:15:08.876 INFO 29112 --- [ Test worker] reactor.Mono.PeekTerminal.8 : | onSubscribe([Fuseable] MonoPeekTerminal.MonoTerminalPeekSubscriber)
2022-04-24 12:15:08.877 INFO 29112 --- [ Test worker] reactor.Mono.PeekTerminal.8 : | request(unbounded)
2022-04-24 12:15:08.880 INFO 29112 --- [ Test worker] reactor.Mono.PeekTerminal.9 : | onSubscribe([Fuseable] MonoPeekTerminal.MonoTerminalPeekSubscriber)
2022-04-24 12:15:08.880 INFO 29112 --- [ Test worker] reactor.Mono.PeekTerminal.9 : | request(unbounded)
2022-04-24 12:15:08.882 INFO 29112 --- [ Test worker] reactor.Mono.PeekTerminal.10 : | onSubscribe([Fuseable] MonoPeekTerminal.MonoTerminalPeekSubscriber)
2022-04-24 12:15:08.882 INFO 29112 --- [ Test worker] reactor.Mono.PeekTerminal.10 : | request(unbounded)
2022-04-24 12:15:08.884 INFO 29112 --- [ Test worker] reactor.Mono.PeekTerminal.11 : | onSubscribe([Fuseable] MonoPeekTerminal.MonoTerminalPeekSubscriber)
2022-04-24 12:15:08.884 INFO 29112 --- [ Test worker] reactor.Mono.PeekTerminal.11 : | request(unbounded)
2022-04-24 12:15:08.887 INFO 29112 --- [ Test worker] reactor.Mono.PeekTerminal.12 : | onSubscribe([Fuseable] MonoPeekTerminal.MonoTerminalPeekSubscriber)
2022-04-24 12:15:08.887 INFO 29112 --- [ Test worker] reactor.Mono.PeekTerminal.12 : | request(unbounded)
这是在收到数据库中 post 的列表后发送 POST 请求以及每个 post 的 link 数据的代码。
在每个 link 请求 POST 后,从响应中提取 playerCount 并将其更新为每个 post。
我在这段代码中使用了Resttemplate,但是有一个问题耗时太长。
所以我想将此代码更改为立即发送请求并在所有请求完成后更新每个 post。
如何将此代码转换为我想要的代码?
我打算将这段代码作为计划任务使用。
@Test
@Transactional
@Rollback(false)
public void postToGraphql2() throws JsonProcessingException, JSONException {
String URL = "https://gt-space-data.herokuapp.com/graphql";
RestTemplate restTemplate = new RestTemplate();
HttpHeaders headers = new HttpHeaders();
headers.add("content-type", "application/json");
long startTime = System.currentTimeMillis();
List<Posts> list = postsRepository.findPostsByCategoryStringContaining("GATHERTOWN");
String query = "query gameData($apikey:String!,$spaceid:String!,$spacename:String!){gameData(spaceData:{apiKey: $apikey, spaceIdNum: $spaceid, spaceName: $spacename}){playerCount,}}";
String opertationName = "gameData";
list.forEach(posts -> {
String link = posts.getLink();
if(link.contains("gather.town")){
String spaceid = link.substring(link.indexOf("app/")+4,link.lastIndexOf("/"));
String spacename = link.substring(link.lastIndexOf("/")+1, link.length());
String variables = "{\"apikey\": \"QUNCVEQGILsqe5\",\"spaceid\": \""+spaceid+"\",\"spacename\" : \""+spacename+"\"}";
try {
ResponseEntity<PlayerCountDto> response =
restTemplate.postForEntity(URL, new HttpEntity<>(createJsonQueries(query,opertationName,variables), headers), PlayerCountDto.class);
int playerCount = Objects.requireNonNull(response.getBody()).getData().getGameData().playerCount;
posts.setPlayerCount(playerCount);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
});
long stopTime = System.currentTimeMillis();
long elapsedTime = stopTime - startTime;
System.out.println(elapsedTime);
}
一种方法是用 'Webclient' 替换 Resttemplate。 Webclient 是 Spring Webflux 的一部分,在 Spring 5.0 中引入。 Webclient 是异步的并且 non-blocking.
您可以从以下文档开始
https://docs.spring.io/spring-boot/docs/2.0.3.RELEASE/reference/html/boot-features-webclient.html
我将代码 Resttemplate 更改为 webclient,但在使用 subscribe() 时无法正常工作。
使用 .block() 时效果很好,但耗时太长。
如何让 .subscribe() 工作?
@Test
@Transactional
@Rollback(false)
public void postToGraphql3() throws JsonProcessingException, JSONException {
String URL = "https://gt-space-data.herokuapp.com/graphql";
WebClient webClient = webClientBuilder.baseUrl(URL).defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE).build();
long startTime = System.currentTimeMillis();
List<Posts> list = postsRepository.findPostsByCategoryStringContaining("GATHERTOWN");
String query = "query gameData($apikey:String!,$spaceid:String!,$spacename:String!){gameData(spaceData:{apiKey: $apikey, spaceIdNum: $spaceid, spaceName: $spacename}){playerCount,}}";
String opertationName = "gameData";
list.forEach(posts -> {
String link = posts.getLink();
String graphqlQuery = null;
if(link.contains("gather.town")){
String spaceid = link.substring(link.indexOf("app/")+4,link.lastIndexOf("/"));
String spacename = link.substring(link.lastIndexOf("/")+1, link.length());
String variables = "{\"apikey\": \"QUNCVEQGILsqeXR5\",\"spaceid\": \""+spaceid+"\",\"spacename\" : \""+spacename+"\"}";
try {
graphqlQuery = createJsonQueries(query,opertationName,variables);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
//not working when using .subscribe()
Mono<PlayerCountDto> playerCountDtoMono = webClient.post().bodyValue(graphqlQuery)
.retrieve()
.bodyToMono(PlayerCountDto.class);
playerCountDtoMono.doOnSuccess(
playerCountDto -> {
System.out.println(playerCountDto.getData().gameData.getPlayerCount());
}
).subscribe();
});
long stopTime = System.currentTimeMillis();
long elapsedTime = stopTime - startTime;
System.out.println(elapsedTime);
}
日志
2022-04-24 12:15:08.101 INFO 29112 --- [ Test worker] reactor.Mono.PeekTerminal.1 : | onSubscribe([Fuseable] MonoPeekTerminal.MonoTerminalPeekSubscriber)
2022-04-24 12:15:08.102 INFO 29112 --- [ Test worker] reactor.Mono.PeekTerminal.1 : | request(unbounded)
2022-04-24 12:15:08.856 INFO 29112 --- [ Test worker] reactor.Mono.PeekTerminal.2 : | onSubscribe([Fuseable] MonoPeekTerminal.MonoTerminalPeekSubscriber)
2022-04-24 12:15:08.856 INFO 29112 --- [ Test worker] reactor.Mono.PeekTerminal.2 : | request(unbounded)
2022-04-24 12:15:08.859 INFO 29112 --- [ Test worker] reactor.Mono.PeekTerminal.3 : | onSubscribe([Fuseable] MonoPeekTerminal.MonoTerminalPeekSubscriber)
2022-04-24 12:15:08.859 INFO 29112 --- [ Test worker] reactor.Mono.PeekTerminal.3 : | request(unbounded)
2022-04-24 12:15:08.861 INFO 29112 --- [ Test worker] reactor.Mono.PeekTerminal.4 : | onSubscribe([Fuseable] MonoPeekTerminal.MonoTerminalPeekSubscriber)
2022-04-24 12:15:08.862 INFO 29112 --- [ Test worker] reactor.Mono.PeekTerminal.4 : | request(unbounded)
2022-04-24 12:15:08.867 INFO 29112 --- [ Test worker] reactor.Mono.PeekTerminal.5 : | onSubscribe([Fuseable] MonoPeekTerminal.MonoTerminalPeekSubscriber)
2022-04-24 12:15:08.867 INFO 29112 --- [ Test worker] reactor.Mono.PeekTerminal.5 : | request(unbounded)
2022-04-24 12:15:08.869 INFO 29112 --- [ Test worker] reactor.Mono.PeekTerminal.6 : | onSubscribe([Fuseable] MonoPeekTerminal.MonoTerminalPeekSubscriber)
2022-04-24 12:15:08.870 INFO 29112 --- [ Test worker] reactor.Mono.PeekTerminal.6 : | request(unbounded)
2022-04-24 12:15:08.872 INFO 29112 --- [ Test worker] reactor.Mono.PeekTerminal.7 : | onSubscribe([Fuseable] MonoPeekTerminal.MonoTerminalPeekSubscriber)
2022-04-24 12:15:08.873 INFO 29112 --- [ Test worker] reactor.Mono.PeekTerminal.7 : | request(unbounded)
2022-04-24 12:15:08.876 INFO 29112 --- [ Test worker] reactor.Mono.PeekTerminal.8 : | onSubscribe([Fuseable] MonoPeekTerminal.MonoTerminalPeekSubscriber)
2022-04-24 12:15:08.877 INFO 29112 --- [ Test worker] reactor.Mono.PeekTerminal.8 : | request(unbounded)
2022-04-24 12:15:08.880 INFO 29112 --- [ Test worker] reactor.Mono.PeekTerminal.9 : | onSubscribe([Fuseable] MonoPeekTerminal.MonoTerminalPeekSubscriber)
2022-04-24 12:15:08.880 INFO 29112 --- [ Test worker] reactor.Mono.PeekTerminal.9 : | request(unbounded)
2022-04-24 12:15:08.882 INFO 29112 --- [ Test worker] reactor.Mono.PeekTerminal.10 : | onSubscribe([Fuseable] MonoPeekTerminal.MonoTerminalPeekSubscriber)
2022-04-24 12:15:08.882 INFO 29112 --- [ Test worker] reactor.Mono.PeekTerminal.10 : | request(unbounded)
2022-04-24 12:15:08.884 INFO 29112 --- [ Test worker] reactor.Mono.PeekTerminal.11 : | onSubscribe([Fuseable] MonoPeekTerminal.MonoTerminalPeekSubscriber)
2022-04-24 12:15:08.884 INFO 29112 --- [ Test worker] reactor.Mono.PeekTerminal.11 : | request(unbounded)
2022-04-24 12:15:08.887 INFO 29112 --- [ Test worker] reactor.Mono.PeekTerminal.12 : | onSubscribe([Fuseable] MonoPeekTerminal.MonoTerminalPeekSubscriber)
2022-04-24 12:15:08.887 INFO 29112 --- [ Test worker] reactor.Mono.PeekTerminal.12 : | request(unbounded)