如何模拟 Spring 反应堆栈线程中的延迟
How to simulate a delay in a Spring reactive stack thread
我正在构建一个简单应用程序的两个 Spring 版本。其中一个是 Servlet 堆栈,另一个是 Reactive 堆栈。我的目标是展示 Reactive 堆栈在等待其他内容时不会阻止线程处理其他请求。
因此,我在两个版本的代码中都模拟了延迟。但是,当延迟生效时,反应堆似乎不处理其他请求。换句话说,它不是被动的,我做错了什么吗?我是否误解了 Spring 反应式的工作方式?我错误地模拟了延迟?
反应式堆栈处理程序class
@Component @RequiredArgsConstructor
public class GradeHandler {
private final GradeRepository gradeRepository;
public Mono<ServerResponse> gradeHandler(ServerRequest serverRequest){
String id = serverRequest.pathVariable("id");
return ServerResponse.
ok().contentType(MediaType.APPLICATION_JSON).
body(getGradeByIdDelayed(id, Duration.ofSeconds(1)), Grade.class);
}
public Mono<ServerResponse> gradeHandler_blocking(ServerRequest serverRequest){
String id = serverRequest.pathVariable("id");
return ServerResponse.
ok().contentType(MediaType.APPLICATION_JSON).
body(getGradeByIdDelayed(id, Duration.ofSeconds(10000)), Grade.class);
}
private Mono<Grade> getGradeByIdDelayed(String id, Duration duration) {
return gradeRepository.
findById(id).
delayElement(duration);
}
}
反应堆路由器配置文件
@Configuration
public class GradeRouterConfig {
@Bean
public RouterFunction<ServerResponse> gradeRouter(GradeHandler gradeHandler){
return RouterFunctions.
route(GET("grade/{id}").
and(accept(MediaType.APPLICATION_JSON)), gradeHandler::gradeHandler).
andRoute(GET("blocking/grade/{id}").
and(accept(MediaType.APPLICATION_JSON)), gradeHandler::gradeHandler_blocking);
}
}
存储库
@Repository
public interface GradeRepository extends ReactiveCrudRepository<Grade, String> {
}
实体
@Document @Data @RequiredArgsConstructor @Builder
public class Grade {
@Id
private final String id;
private final double grade;
private final String studentName;
}
这是调用端点
的客户端JavaScript代码
gradesToBeQuried = [1, 2, 3, 4, 5]
var t0 = performance.now()
function httpGetAsync(theUrl, callback)
{
var xmlHttp = new XMLHttpRequest();
xmlHttp.onload = () => callback(xmlHttp.responseText);
xmlHttp.open("GET", theUrl, true); // true for asynchronous
xmlHttp.send(null);
}
function getRandomGradeId(grades) {
return grades[Math.floor(Math.random() * grades.length)];
}
function getURLWithRandomGradeID(grades){
randomGradeId = getRandomGradeId(grades);
return "http://localhost:8080/grade/" + randomGradeId;
}
function getURLWithRandomGradeID_blocking(grades){
randomGradeId = getRandomGradeId(grades);
return "http://localhost:8080/blocking/grade/" + randomGradeId;
}
function checkFinishConditionAndLog(currentIndex, totalNumberOfQueries) {
if (currentIndex === totalNumberOfQueries - 1) {
var t1 = performance.now()
console.log("it took " + (t1 - t0) / 1000 + " seconds.")
}
}
function queryGrade(currentIndex, totalNumberOfQueries){
let urlWithRandomGradeID = getURLWithRandomGradeID(gradesToBeQuried);
httpGetAsync(urlWithRandomGradeID, (result) => {
console.log(currentIndex + result);
checkFinishConditionAndLog(currentIndex, totalNumberOfQueries);
})
}
function queryGrade_blocking(){
let urlWithRandomGradeID = getURLWithRandomGradeID_blocking(gradesToBeQuried);
httpGetAsync(urlWithRandomGradeID, (result) => console.log("blocking thread is done"))
}
function runQueries(){
const totalNumberOfQueries = 1000;
const totalNumberOfBlockingQueries = 15;
Array(totalNumberOfBlockingQueries).fill().map((_, i) => queryGrade_blocking());
Array(totalNumberOfQueries).fill().map((_, i) => queryGrade(i, totalNumberOfQueries));
}
runQueries();
经过大量研究,我发现了这个 ,结果是 delayElement
函数阻塞了线程。
我正在构建一个简单应用程序的两个 Spring 版本。其中一个是 Servlet 堆栈,另一个是 Reactive 堆栈。我的目标是展示 Reactive 堆栈在等待其他内容时不会阻止线程处理其他请求。 因此,我在两个版本的代码中都模拟了延迟。但是,当延迟生效时,反应堆似乎不处理其他请求。换句话说,它不是被动的,我做错了什么吗?我是否误解了 Spring 反应式的工作方式?我错误地模拟了延迟?
反应式堆栈处理程序class
@Component @RequiredArgsConstructor
public class GradeHandler {
private final GradeRepository gradeRepository;
public Mono<ServerResponse> gradeHandler(ServerRequest serverRequest){
String id = serverRequest.pathVariable("id");
return ServerResponse.
ok().contentType(MediaType.APPLICATION_JSON).
body(getGradeByIdDelayed(id, Duration.ofSeconds(1)), Grade.class);
}
public Mono<ServerResponse> gradeHandler_blocking(ServerRequest serverRequest){
String id = serverRequest.pathVariable("id");
return ServerResponse.
ok().contentType(MediaType.APPLICATION_JSON).
body(getGradeByIdDelayed(id, Duration.ofSeconds(10000)), Grade.class);
}
private Mono<Grade> getGradeByIdDelayed(String id, Duration duration) {
return gradeRepository.
findById(id).
delayElement(duration);
}
}
反应堆路由器配置文件
@Configuration
public class GradeRouterConfig {
@Bean
public RouterFunction<ServerResponse> gradeRouter(GradeHandler gradeHandler){
return RouterFunctions.
route(GET("grade/{id}").
and(accept(MediaType.APPLICATION_JSON)), gradeHandler::gradeHandler).
andRoute(GET("blocking/grade/{id}").
and(accept(MediaType.APPLICATION_JSON)), gradeHandler::gradeHandler_blocking);
}
}
存储库
@Repository
public interface GradeRepository extends ReactiveCrudRepository<Grade, String> {
}
实体
@Document @Data @RequiredArgsConstructor @Builder
public class Grade {
@Id
private final String id;
private final double grade;
private final String studentName;
}
这是调用端点
的客户端JavaScript代码gradesToBeQuried = [1, 2, 3, 4, 5]
var t0 = performance.now()
function httpGetAsync(theUrl, callback)
{
var xmlHttp = new XMLHttpRequest();
xmlHttp.onload = () => callback(xmlHttp.responseText);
xmlHttp.open("GET", theUrl, true); // true for asynchronous
xmlHttp.send(null);
}
function getRandomGradeId(grades) {
return grades[Math.floor(Math.random() * grades.length)];
}
function getURLWithRandomGradeID(grades){
randomGradeId = getRandomGradeId(grades);
return "http://localhost:8080/grade/" + randomGradeId;
}
function getURLWithRandomGradeID_blocking(grades){
randomGradeId = getRandomGradeId(grades);
return "http://localhost:8080/blocking/grade/" + randomGradeId;
}
function checkFinishConditionAndLog(currentIndex, totalNumberOfQueries) {
if (currentIndex === totalNumberOfQueries - 1) {
var t1 = performance.now()
console.log("it took " + (t1 - t0) / 1000 + " seconds.")
}
}
function queryGrade(currentIndex, totalNumberOfQueries){
let urlWithRandomGradeID = getURLWithRandomGradeID(gradesToBeQuried);
httpGetAsync(urlWithRandomGradeID, (result) => {
console.log(currentIndex + result);
checkFinishConditionAndLog(currentIndex, totalNumberOfQueries);
})
}
function queryGrade_blocking(){
let urlWithRandomGradeID = getURLWithRandomGradeID_blocking(gradesToBeQuried);
httpGetAsync(urlWithRandomGradeID, (result) => console.log("blocking thread is done"))
}
function runQueries(){
const totalNumberOfQueries = 1000;
const totalNumberOfBlockingQueries = 15;
Array(totalNumberOfBlockingQueries).fill().map((_, i) => queryGrade_blocking());
Array(totalNumberOfQueries).fill().map((_, i) => queryGrade(i, totalNumberOfQueries));
}
runQueries();
经过大量研究,我发现了这个 delayElement
函数阻塞了线程。