如何在没有嵌套订阅的情况下 combine/chain 包含不同数据类型的多个 Mono/Flux
How to combine/chain multiple Mono/Flux containing different datatypes without nested subscriptions
我们正在使用 project-reactor 从外部网络服务中检索一些数据并生成一堆结果对象。
首先我们需要获取触发下一个网络服务调用所需的一些主数据。主数据可用后,我们根据主数据的结果检索更多数据。接下来我们必须等待所有 Monos 发出它的结果。然后我们处理所有数据并构建结果对象。
我们对反应流没有太多经验。我们的嵌套订阅解决方案有效,但我们相信可能有更好的方法来归档我们想做的事情。
问题一
Masterdata_A 和 Masterdata_B 可以并行获取,但如何在不嵌套的情况下以反应方式表达呢?
getFluxMasterdata_B 的每个结果应该与 getMonoMasterdata_A.
的一个结果组合
问题二
具有两个主数据的 Tupel 应该在某种程度上受到限制,以免因大量数据请求而使网络服务不堪重负。 1 秒的实际延迟只是一个似乎可行的猜测,但最好定义第一个内部 flatMap 的最大并行执行数,以便一次最多有 N 个等待的 web 服务调用。
问题三
将来我们可能需要从网络服务中获取更多数据来构建 ProcessingResult。是否有定义反应流以保持它 readable/understandable 的最佳实践?
反应流的嵌套可以还是应该避免(将所有内容都放在顶层)?
域模型
private static class Masterdata_A
{
private List<MasterdataRecord_A> records;
}
private static class MasterdataRecord_A { /* ... business relevant fields */ }
private static class MasterdataRecord_B { /* ... business relevant fields */ }
private static class Data_A { /* ... business relevant fields */ }
private static class Data_B { /* ... business relevant fields */ }
private static class Data_C { /* ... business relevant fields */ }
private static class ProcessingResult { /* ... business relevant fields */ }
WebserviceImpl
private static class Webservice
{
private Mono<Masterdata_A> getMonoMasterdata_A() { /* fetch data from external webservice */ }
private Flux<MasterdataRecord_B> getFluxMasterdata_B() { /* fetch data from external webservice */ }
private Mono<Data_A> getMonoData_A() { /* fetch data from external webservice */ }
private Mono<Data_B> getMonoData_B() { /* fetch data from external webservice */ }
private Mono<Data_C> getMonoData_C() { /* fetch data from external webservice */ }
}
BusinessServiceImpl
public class BusinessService
{
public void processData(...params...)
{
Webservice webservie = getWebservice();
// As soon as Mono<Masterdata_A> emits its result AND Flux<Masterdata_B> emits its first result than the first inner flatMap should be executed
// to fetch some extra data from the service based on the actual masterdata.
// For building the ProcessingResult we need access to all data available in the actual context.
webservice.getMonoMasterdata_A()
.subscribe((Masterdata_A masterdataA) -> {
webservice.getFluxMasterdata_B()
.delayElements(Duration.ofSeconds(1))
.flatMap((MasterdataRecord_B masterdataB) -> {
Mono<Data_A> monoA = webservice.getMonoData_A(masterdataA);
Mono<Data_B> monoB = webservice.getMonoData_B(masterdataB);
Mono<Data_C> monoC = webservice.getMonoData_C(masterdataA, masterdataB);
// wait for result of all Monos
return Mono.zip(monoA, monoB, monoC);
})
.flatMap((Tuple3<Data_A, Data_B, Data_C> data) -> {
Data_A dataA = data.getT1();
Data_B dataB = data.getT2();
Data_C dataC = data.getT3();
// create result from masterdataA, masterdataB, dataA, dataB, dataC
ProcessingResult result = ...;
return Mono.just(result);
})
.subscribe(processingResult -> {
// store result to db/filesystem
});
});
}
}
Question 1
Mono<Masterdata_A> monoMasterdata_a = webservice.getMonoMasterdata_A();
Flux<MasterdataRecord_B> masterdataRecordBFlux = webservice.getFluxMasterdata_B();
// suppose that getMonoMasterdata_A return just "A" and getFluxMasterdata_B reutrn [1,2,3,,,]
// then the result will be [(A,1), (A,2), (A,3),,,]
// masterdataAFlux and masterdataRecordBFlux will execute in parallel
Flux.combineLatest(monoMasterdata_a, masterdataRecordBFlux, Tuples::of)
Question 2
Flux.combineLatest(monoMasterdata_a, masterdataRecordBFlux, Tuples::of)
// yes that will work just fine for not overwhelming the web services
// 500 is random value you need to test and tune the optimal value for these services
.delayElements(Duration.ofMillis(500))
.flatMap((Tuple2<Masterdata_A, MasterdataRecord_B> tuple2) -> {
Mono<Data_A> monoA = webservice.getMonoData_A();
Mono<Data_B> monoB = webservice.getMonoData_B();
Mono<Data_C> monoC = webservice.getMonoData_C();
// wait for result of all Monos
return Mono.zip(monoA, monoB, monoC);
},
// flatmap can take the num of concurrent actions
// 5 is random value also u need to test and check the best value for that
5)
Question 3
看看这个
https://github.com/reactor/reactive-streams-commons/issues/21
complete example
Mono<Masterdata_A> monoMasterdata_a = webservice.getMonoMasterdata_A();
Flux<MasterdataRecord_B> masterdataRecordBFlux = webservice.getFluxMasterdata_B();
// suppose that getMonoMasterdata_A return just "A" and getFluxMasterdata_B reutrn [1,2,3,,,]
// then the result will be [(A,1), (A,2), (A,3),,,]
// masterdataAFlux and masterdataRecordBFlux will execute in parallel
Flux.combineLatest(monoMasterdata_a, masterdataRecordBFlux, Tuples::of)
// yes that will work just fine for not overwhelming the web services
// 500 is random value you need to test and tune the optimal value for these services
.delayElements(Duration.ofMillis(500))
.flatMap((Tuple2<Masterdata_A, MasterdataRecord_B> tuple2) -> {
Mono<Data_A> monoA = webservice.getMonoData_A();
Mono<Data_B> monoB = webservice.getMonoData_B();
Mono<Data_C> monoC = webservice.getMonoData_C();
// wait for result of all Monos
return Mono.zip(monoA, monoB, monoC);
},
// flatmap can take the num of concurrent actions
// 5 is random value also u need to test and check the best value for that
5)
.map(data -> {
// for the mapping u don't need flatmap because it's an expensive operation
// map is the right choice
Data_A dataA = data.getT1();
Data_B dataB = data.getT2();
Data_C dataC = data.getT3();
// create result from masterdataA, masterdataB, dataA, dataB, dataC
ProcessingResult result = ???;
return result;
})
// it's always better to save in batch
// 100 is a random value u should put a value that most suitable for your datasource
.bufferTimeout(100, Duration.ofMillis(100))
.concatMap(processingResults -> {
return batchSave(processingResults)
// because batchSave is blocking op
.subscribeOn(Schedulers.boundedElastic());
})
.subscribe();
我们正在使用 project-reactor 从外部网络服务中检索一些数据并生成一堆结果对象。
首先我们需要获取触发下一个网络服务调用所需的一些主数据。主数据可用后,我们根据主数据的结果检索更多数据。接下来我们必须等待所有 Monos 发出它的结果。然后我们处理所有数据并构建结果对象。
我们对反应流没有太多经验。我们的嵌套订阅解决方案有效,但我们相信可能有更好的方法来归档我们想做的事情。
问题一
Masterdata_A 和 Masterdata_B 可以并行获取,但如何在不嵌套的情况下以反应方式表达呢? getFluxMasterdata_B 的每个结果应该与 getMonoMasterdata_A.
的一个结果组合问题二
具有两个主数据的 Tupel 应该在某种程度上受到限制,以免因大量数据请求而使网络服务不堪重负。 1 秒的实际延迟只是一个似乎可行的猜测,但最好定义第一个内部 flatMap 的最大并行执行数,以便一次最多有 N 个等待的 web 服务调用。
问题三
将来我们可能需要从网络服务中获取更多数据来构建 ProcessingResult。是否有定义反应流以保持它 readable/understandable 的最佳实践? 反应流的嵌套可以还是应该避免(将所有内容都放在顶层)?
域模型
private static class Masterdata_A
{
private List<MasterdataRecord_A> records;
}
private static class MasterdataRecord_A { /* ... business relevant fields */ }
private static class MasterdataRecord_B { /* ... business relevant fields */ }
private static class Data_A { /* ... business relevant fields */ }
private static class Data_B { /* ... business relevant fields */ }
private static class Data_C { /* ... business relevant fields */ }
private static class ProcessingResult { /* ... business relevant fields */ }
WebserviceImpl
private static class Webservice
{
private Mono<Masterdata_A> getMonoMasterdata_A() { /* fetch data from external webservice */ }
private Flux<MasterdataRecord_B> getFluxMasterdata_B() { /* fetch data from external webservice */ }
private Mono<Data_A> getMonoData_A() { /* fetch data from external webservice */ }
private Mono<Data_B> getMonoData_B() { /* fetch data from external webservice */ }
private Mono<Data_C> getMonoData_C() { /* fetch data from external webservice */ }
}
BusinessServiceImpl
public class BusinessService
{
public void processData(...params...)
{
Webservice webservie = getWebservice();
// As soon as Mono<Masterdata_A> emits its result AND Flux<Masterdata_B> emits its first result than the first inner flatMap should be executed
// to fetch some extra data from the service based on the actual masterdata.
// For building the ProcessingResult we need access to all data available in the actual context.
webservice.getMonoMasterdata_A()
.subscribe((Masterdata_A masterdataA) -> {
webservice.getFluxMasterdata_B()
.delayElements(Duration.ofSeconds(1))
.flatMap((MasterdataRecord_B masterdataB) -> {
Mono<Data_A> monoA = webservice.getMonoData_A(masterdataA);
Mono<Data_B> monoB = webservice.getMonoData_B(masterdataB);
Mono<Data_C> monoC = webservice.getMonoData_C(masterdataA, masterdataB);
// wait for result of all Monos
return Mono.zip(monoA, monoB, monoC);
})
.flatMap((Tuple3<Data_A, Data_B, Data_C> data) -> {
Data_A dataA = data.getT1();
Data_B dataB = data.getT2();
Data_C dataC = data.getT3();
// create result from masterdataA, masterdataB, dataA, dataB, dataC
ProcessingResult result = ...;
return Mono.just(result);
})
.subscribe(processingResult -> {
// store result to db/filesystem
});
});
}
}
Question 1
Mono<Masterdata_A> monoMasterdata_a = webservice.getMonoMasterdata_A();
Flux<MasterdataRecord_B> masterdataRecordBFlux = webservice.getFluxMasterdata_B();
// suppose that getMonoMasterdata_A return just "A" and getFluxMasterdata_B reutrn [1,2,3,,,]
// then the result will be [(A,1), (A,2), (A,3),,,]
// masterdataAFlux and masterdataRecordBFlux will execute in parallel
Flux.combineLatest(monoMasterdata_a, masterdataRecordBFlux, Tuples::of)
Question 2
Flux.combineLatest(monoMasterdata_a, masterdataRecordBFlux, Tuples::of)
// yes that will work just fine for not overwhelming the web services
// 500 is random value you need to test and tune the optimal value for these services
.delayElements(Duration.ofMillis(500))
.flatMap((Tuple2<Masterdata_A, MasterdataRecord_B> tuple2) -> {
Mono<Data_A> monoA = webservice.getMonoData_A();
Mono<Data_B> monoB = webservice.getMonoData_B();
Mono<Data_C> monoC = webservice.getMonoData_C();
// wait for result of all Monos
return Mono.zip(monoA, monoB, monoC);
},
// flatmap can take the num of concurrent actions
// 5 is random value also u need to test and check the best value for that
5)
Question 3
看看这个 https://github.com/reactor/reactive-streams-commons/issues/21
complete example
Mono<Masterdata_A> monoMasterdata_a = webservice.getMonoMasterdata_A();
Flux<MasterdataRecord_B> masterdataRecordBFlux = webservice.getFluxMasterdata_B();
// suppose that getMonoMasterdata_A return just "A" and getFluxMasterdata_B reutrn [1,2,3,,,]
// then the result will be [(A,1), (A,2), (A,3),,,]
// masterdataAFlux and masterdataRecordBFlux will execute in parallel
Flux.combineLatest(monoMasterdata_a, masterdataRecordBFlux, Tuples::of)
// yes that will work just fine for not overwhelming the web services
// 500 is random value you need to test and tune the optimal value for these services
.delayElements(Duration.ofMillis(500))
.flatMap((Tuple2<Masterdata_A, MasterdataRecord_B> tuple2) -> {
Mono<Data_A> monoA = webservice.getMonoData_A();
Mono<Data_B> monoB = webservice.getMonoData_B();
Mono<Data_C> monoC = webservice.getMonoData_C();
// wait for result of all Monos
return Mono.zip(monoA, monoB, monoC);
},
// flatmap can take the num of concurrent actions
// 5 is random value also u need to test and check the best value for that
5)
.map(data -> {
// for the mapping u don't need flatmap because it's an expensive operation
// map is the right choice
Data_A dataA = data.getT1();
Data_B dataB = data.getT2();
Data_C dataC = data.getT3();
// create result from masterdataA, masterdataB, dataA, dataB, dataC
ProcessingResult result = ???;
return result;
})
// it's always better to save in batch
// 100 is a random value u should put a value that most suitable for your datasource
.bufferTimeout(100, Duration.ofMillis(100))
.concatMap(processingResults -> {
return batchSave(processingResults)
// because batchSave is blocking op
.subscribeOn(Schedulers.boundedElastic());
})
.subscribe();