Spring 反应式 - 重用 Mono 值
Spring Reactive - reuse Mono value
我有一系列使用 flatMap
的 Mono
转换。我设法将我的生产代码简化为这个测试用例:
@Test
public void test() {
AtomicInteger iCounter = new AtomicInteger(1);
Mono<String> iValueMono = Mono.fromSupplier(() -> {
int iValue = iCounter.getAndIncrement();
System.out.println("iValueMono CALL: " + iValue);
return String.valueOf(iValue);
});
Mono<String> resultMono = Mono.just("X")
.flatMap(append(iValueMono))
.flatMap(append(iValueMono));
StepVerifier.create(resultMono)
.consumeNextWith(result -> assertThat(result).isEqualTo("X11"))
.expectComplete()
.verify();
}
private Function<String, Mono<String>> append(Mono<String> sMono) {
return s -> sMono.map(v -> s + v);
}
这会打印:
iValueMono CALL: 1
iValueMono CALL: 2
org.junit.ComparisonFailure:
Expected :"X11"
Actual :"X12"
我想 - 我现在知道这是不正确的 - 每次我在 append()
调用中映射 iValueMono
时,供应商都会重新执行以产生一个新值。我无法在生产代码中更改 iValueMono
的实现方式(例如,使其有状态地存储值)。我怎样才能实现这一点,以便价值供应商只被调用一次,我得到最终结果 "X11"?
当然,我对一种非阻塞的反应式方法很感兴趣。
我重写了你的测试,现在 iValueMono 似乎只执行一次:
@Test
public void test() {
AtomicInteger iCounter = new AtomicInteger(0);
Mono<String> iValueMono = getMono(iCounter.incrementAndGet());
Mono<String> resultMono = Mono.just("X")
.flatMap(append(iValueMono))
.flatMap(append(iValueMono));
StepVerifier.create(resultMono)
.consumeNextWith(result -> assertEquals(result, "X11"))
.expectComplete()
.verify();
}
private Mono<String> getMono(int x) {
System.out.println("Called");
return Mono.just(String.valueOf(x));
}
你怎么看?有帮助吗?
使用Mono.cache()
就是答案:
Turn this Mono into a hot source and cache last emitted signals for further Subscriber.
使用它:
Mono<String> iValueMono = Mono.fromSupplier(() -> {
int iValue = iCounter.getAndIncrement();
System.out.println("iValueMono CALL: " + iValue);
return String.valueOf(iValue);
}).cache();
仅调用供应商一次即可提供所需的结果。
我有一系列使用 flatMap
的 Mono
转换。我设法将我的生产代码简化为这个测试用例:
@Test
public void test() {
AtomicInteger iCounter = new AtomicInteger(1);
Mono<String> iValueMono = Mono.fromSupplier(() -> {
int iValue = iCounter.getAndIncrement();
System.out.println("iValueMono CALL: " + iValue);
return String.valueOf(iValue);
});
Mono<String> resultMono = Mono.just("X")
.flatMap(append(iValueMono))
.flatMap(append(iValueMono));
StepVerifier.create(resultMono)
.consumeNextWith(result -> assertThat(result).isEqualTo("X11"))
.expectComplete()
.verify();
}
private Function<String, Mono<String>> append(Mono<String> sMono) {
return s -> sMono.map(v -> s + v);
}
这会打印:
iValueMono CALL: 1
iValueMono CALL: 2
org.junit.ComparisonFailure:
Expected :"X11"
Actual :"X12"
我想 - 我现在知道这是不正确的 - 每次我在 append()
调用中映射 iValueMono
时,供应商都会重新执行以产生一个新值。我无法在生产代码中更改 iValueMono
的实现方式(例如,使其有状态地存储值)。我怎样才能实现这一点,以便价值供应商只被调用一次,我得到最终结果 "X11"?
当然,我对一种非阻塞的反应式方法很感兴趣。
我重写了你的测试,现在 iValueMono 似乎只执行一次:
@Test
public void test() {
AtomicInteger iCounter = new AtomicInteger(0);
Mono<String> iValueMono = getMono(iCounter.incrementAndGet());
Mono<String> resultMono = Mono.just("X")
.flatMap(append(iValueMono))
.flatMap(append(iValueMono));
StepVerifier.create(resultMono)
.consumeNextWith(result -> assertEquals(result, "X11"))
.expectComplete()
.verify();
}
private Mono<String> getMono(int x) {
System.out.println("Called");
return Mono.just(String.valueOf(x));
}
你怎么看?有帮助吗?
使用Mono.cache()
就是答案:
Turn this Mono into a hot source and cache last emitted signals for further Subscriber.
使用它:
Mono<String> iValueMono = Mono.fromSupplier(() -> {
int iValue = iCounter.getAndIncrement();
System.out.println("iValueMono CALL: " + iValue);
return String.valueOf(iValue);
}).cache();
仅调用供应商一次即可提供所需的结果。