主线程不等待订阅者在反应订阅者中完成他们的任务
Main thread is not waiting for subscribers to finish their task in reactive subscriber
我在 spring 中有一项服务需要使用十种不同的方法获取数据。
我想让这些方法并行执行以执行一些数据库操作并 return 到父线程。但是父线程应该等到所有响应都来了然后 return 一个响应。
在我目前的方法中,我使用反应式单声道异步执行所有方法,但主线程不等待订阅者方法完成。
下面是我订阅的两个方法
private Mono<BaseResponse> getProfileDetails(long profileId){
return new Mono<BaseResponse>() {
@Override
public void subscribe(Subscriber<? super BaseResponse> s) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// DB Operation
System.out.println("Inside getProfileDetails");
s.onNext(new BaseResponse());
}
};
}
private Mono<Address> getAddressDetails(long profileId){
return new Mono<Address>() {
@Override
public void subscribe(Subscriber<? super Address> s) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// DB Operation
System.out.println("Inside getAddressDetails");
s.onNext(new Address());
}
};
}
下面是我的主要方法
public BaseResponse getDetails(long profileId){
ExecutorService executors = Executors.newFixedThreadPool(2);
Mono<BaseResponse> profileDetail = this.getProfileDetails(profileId).subscribeOn(Schedulers.fromExecutor(executors));
Mono<BaseResponse> addressDetail = this.getAddressDetails(profileId).subscribeOn(Schedulers.fromExecutor(executors));
List<BaseResponse> list = new ArrayList<>();
profileDetail.mergeWith(addressDetail)
.subscribe(consumer -> {
list.add(consumer);
});
System.out.println("list: "+new Gson().toJson(list));
executors.shutdown();
return response;
}
下面是我的输出:
list: []
Inside getProfileDetails
Inside getAddressDetails
我的输出显示主线程没有等待订阅者完成它的任务,
那么我该如何处理这种情况呢?
我假设您的 getProfileDetails()
和 getAddressDetails()
方法只是占位符,因为它们在书面上没有多大意义。
话虽这么说,如果这是您的整个应用程序,并且您真的只是想在完成之前阻止,您不妨将当前的 subscribe()
调用更改为 doOnNext()
,然后只是 blockLast()
:
profileDetail.mergeWith(addressDetail)
.doOnNext(consumer -> {
list.add(consumer);
})
.blockLast();
出于充分的理由,在反应式应用程序中阻塞反应式线程通常是不明智的,但在这种情况下,您实际上只想在完全退出之前进行阻塞 - 所以我在这里看不到太多缺点。
我在 spring 中有一项服务需要使用十种不同的方法获取数据。
我想让这些方法并行执行以执行一些数据库操作并 return 到父线程。但是父线程应该等到所有响应都来了然后 return 一个响应。
在我目前的方法中,我使用反应式单声道异步执行所有方法,但主线程不等待订阅者方法完成。
下面是我订阅的两个方法
private Mono<BaseResponse> getProfileDetails(long profileId){
return new Mono<BaseResponse>() {
@Override
public void subscribe(Subscriber<? super BaseResponse> s) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// DB Operation
System.out.println("Inside getProfileDetails");
s.onNext(new BaseResponse());
}
};
}
private Mono<Address> getAddressDetails(long profileId){
return new Mono<Address>() {
@Override
public void subscribe(Subscriber<? super Address> s) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// DB Operation
System.out.println("Inside getAddressDetails");
s.onNext(new Address());
}
};
}
下面是我的主要方法
public BaseResponse getDetails(long profileId){
ExecutorService executors = Executors.newFixedThreadPool(2);
Mono<BaseResponse> profileDetail = this.getProfileDetails(profileId).subscribeOn(Schedulers.fromExecutor(executors));
Mono<BaseResponse> addressDetail = this.getAddressDetails(profileId).subscribeOn(Schedulers.fromExecutor(executors));
List<BaseResponse> list = new ArrayList<>();
profileDetail.mergeWith(addressDetail)
.subscribe(consumer -> {
list.add(consumer);
});
System.out.println("list: "+new Gson().toJson(list));
executors.shutdown();
return response;
}
下面是我的输出:
list: []
Inside getProfileDetails
Inside getAddressDetails
我的输出显示主线程没有等待订阅者完成它的任务, 那么我该如何处理这种情况呢?
我假设您的 getProfileDetails()
和 getAddressDetails()
方法只是占位符,因为它们在书面上没有多大意义。
话虽这么说,如果这是您的整个应用程序,并且您真的只是想在完成之前阻止,您不妨将当前的 subscribe()
调用更改为 doOnNext()
,然后只是 blockLast()
:
profileDetail.mergeWith(addressDetail)
.doOnNext(consumer -> {
list.add(consumer);
})
.blockLast();
出于充分的理由,在反应式应用程序中阻塞反应式线程通常是不明智的,但在这种情况下,您实际上只想在完全退出之前进行阻塞 - 所以我在这里看不到太多缺点。