Mono.defer() 是做什么的?
what does Mono.defer() do?
我在一些 Spring webflux 代码中遇到过 Mono.defer()
我在文档中查找了方法,但看不懂解释:
"创建一个 Mono 提供者,它将提供一个要订阅的目标 Mono
对于下游的每个订阅者
请给我一个解释和一个例子。有没有我可以参考的一堆 Reactor 示例代码(他们的单元测试?)的地方。
谢谢
用简单的话
如果您在第一个视图中看到它就像 Mono.just() 但不是。
当你 运行 Mono.just() 它会立即创建一个 Observable(Mono) 并重用它但是当你使用 defer 它不会立即创建它它会在每个订阅中创建一个新的 Observable。
一个用例看区别
int a = 5;
@Override
public void run(String... args) throws Exception {
Mono<Integer> monoJust = Mono.just(a);
Mono<Integer> monoDefer = Mono.defer(() -> Mono.just(a));
monoJust.subscribe(integer1 -> System.out.println(integer1));
monoDefer.subscribe(integer1 -> System.out.println(integer1));
a = 7;
monoJust.subscribe(integer1 -> System.out.println(integer1));
monoDefer.subscribe(integer1 -> System.out.println(integer1));
}
打印:
5
5
5
7
如果你看到 mono.just 立即创建了 observable 并且即使值已更改它也不会更改但是 defer 在 subscribe 中创建 observable 因此你将使用当前的 onSubscribe 值
这有点过于简单化了,但从概念上讲,Reactor 源要么是懒惰的,要么是急切的。更高级的请求,如 HTTP 请求,预计将被惰性评估。另一方面,最简单的 Mono.just
或 Flux.fromIterable
是急切的。
我的意思是调用 Mono.just(System.currentTimeMillis())
将立即调用 currentTimeMillis()
方法并捕获结果。一旦订阅,Mono
只会 发出 所述结果。多次订阅也不会改变值:
Mono<Long> clock = Mono.just(System.currentTimeMillis());
//time == t0
Thread.sleep(10_000);
//time == t10
clock.block(); //we use block for demonstration purposes, returns t0
Thread.sleep(7_000);
//time == t17
clock.block(); //we re-subscribe to clock, still returns t0
defer
运算符是为了让这个源变得懒惰,重新评估 lambda 的内容每次有新订阅者:
Mono<Long> clock = Mono.defer(() -> Mono.just(System.currentTimeMillis()));
//time == t0
Thread.sleep(10_000);
//time == t10
clock.block(); //invoked currentTimeMillis() here and returns t10
Thread.sleep(7_000);
//time == t17
clock.block(); //invoke currentTimeMillis() once again here and returns t17
我正在尝试 defer
不同的用例。编写以下代码进行检查和分享,因为它可能会帮助其他人。我的用例是链接两个 Mono
s 并确保第一个在第二个被占用之前完成。第二个包含一个阻塞调用,其结果用于使用 empty
或 error
响应来响应 Mono
。没有 defer
,无论第一个结果如何,我的阻塞调用都会执行。但是在使用 defer
时,阻塞调用仅在第一个 Mono
完成时执行。代码如下:
public static void main(String[] args) {
long cur = System.currentTimeMillis();
boolean succeed = true;
Mono<Integer> monoJust = Mono.create(consumer -> {
System.out.println("MonoJust inside " + (System.currentTimeMillis() - cur));
if (succeed) {
consumer.success(1);
} else {
consumer.error(new RuntimeException("aaa"));
}
});
Mono<String> monoJustStr = Mono.create(consumer -> {
System.out.println("MonoJustStr inside: " + (System.currentTimeMillis() - cur));
consumer.success("one");
});
System.out.println("##1##: Begin");
monoJust.then(evaluator() ? Mono.empty() : monoJustStr).subscribe(d -> System.out.println("##1##: "+d), e-> System.err.println(e));
System.out.println("##1##: Done: "+(System.currentTimeMillis() - cur));
System.out.println("\n\n\n##2##: Begin");
monoJust.then(Mono.defer(() -> evaluator() ? Mono.empty() : monoJustStr)).subscribe(d -> System.out.println("##2##: "+d), e-> System.err.println(e));
System.out.println("##2##: Done: " + (System.currentTimeMillis() - cur));
}
private static boolean evaluator() {
System.out.println("Inside Evaluator");
return false;
}
输出 succeed=true
- 观察 "Inside Evaluator" 和 "MonoJust inside"
的顺序
##1##: Begin
Inside Evaluator
MonoJust inside 540
MonoJustStr inside: 542
##1##: one
##1##: Done: 542
##2##: Begin
MonoJust inside 544
Inside Evaluator
MonoJustStr inside: 544
##2##: one
##2##: Done: 544
下面是 succeed = false
的输出 - 注意没有调用评估器。
##1##: Begin
Inside Evaluator
MonoJust inside 565
java.lang.RuntimeException: aaa
##1##: Done: 567
##2##: Begin
MonoJust inside 569
java.lang.RuntimeException: aaa
##2##: Done: 569
初学者的简单答案:
当在 monoJust 变量上调用订阅时,它将打印一个随机整数三次。但是在 monoDefer 变量上调用订阅时,它每次都可以打印一个随机数。
Mono<Integer> justMono = Mono.just((new Random()).nextInt(10));
//this will print same random number thrice
for(int i=0;i<3;i++)
justMono.subscribe(x -> {System.out.println("Just Mono: " + x);});
Mono<Integer> deferMono = Mono.defer(() -> Mono.just((new Random()).nextInt(10)));
//this might print three different random numbers
for(int i=0;i<3;i++)
deferMono.subscribe(x -> {System.out.println("Defer Mono: " + x);});
在 Mono.just() 中,实例化仅在第一次订阅发生时发生一次。在 Mono.defer() 中,每次调用订阅时都会发生实例化。
更多参考请查看:
https://www.youtube.com/watch?v=eupNfdKMFL4&t=381s 在 3:15 分钟
我在一些 Spring webflux 代码中遇到过 Mono.defer()
我在文档中查找了方法,但看不懂解释:
"创建一个 Mono 提供者,它将提供一个要订阅的目标 Mono 对于下游的每个订阅者
请给我一个解释和一个例子。有没有我可以参考的一堆 Reactor 示例代码(他们的单元测试?)的地方。
谢谢
用简单的话 如果您在第一个视图中看到它就像 Mono.just() 但不是。 当你 运行 Mono.just() 它会立即创建一个 Observable(Mono) 并重用它但是当你使用 defer 它不会立即创建它它会在每个订阅中创建一个新的 Observable。
一个用例看区别
int a = 5;
@Override
public void run(String... args) throws Exception {
Mono<Integer> monoJust = Mono.just(a);
Mono<Integer> monoDefer = Mono.defer(() -> Mono.just(a));
monoJust.subscribe(integer1 -> System.out.println(integer1));
monoDefer.subscribe(integer1 -> System.out.println(integer1));
a = 7;
monoJust.subscribe(integer1 -> System.out.println(integer1));
monoDefer.subscribe(integer1 -> System.out.println(integer1));
}
打印:
5
5
5
7
如果你看到 mono.just 立即创建了 observable 并且即使值已更改它也不会更改但是 defer 在 subscribe 中创建 observable 因此你将使用当前的 onSubscribe 值
这有点过于简单化了,但从概念上讲,Reactor 源要么是懒惰的,要么是急切的。更高级的请求,如 HTTP 请求,预计将被惰性评估。另一方面,最简单的 Mono.just
或 Flux.fromIterable
是急切的。
我的意思是调用 Mono.just(System.currentTimeMillis())
将立即调用 currentTimeMillis()
方法并捕获结果。一旦订阅,Mono
只会 发出 所述结果。多次订阅也不会改变值:
Mono<Long> clock = Mono.just(System.currentTimeMillis());
//time == t0
Thread.sleep(10_000);
//time == t10
clock.block(); //we use block for demonstration purposes, returns t0
Thread.sleep(7_000);
//time == t17
clock.block(); //we re-subscribe to clock, still returns t0
defer
运算符是为了让这个源变得懒惰,重新评估 lambda 的内容每次有新订阅者:
Mono<Long> clock = Mono.defer(() -> Mono.just(System.currentTimeMillis()));
//time == t0
Thread.sleep(10_000);
//time == t10
clock.block(); //invoked currentTimeMillis() here and returns t10
Thread.sleep(7_000);
//time == t17
clock.block(); //invoke currentTimeMillis() once again here and returns t17
我正在尝试 defer
不同的用例。编写以下代码进行检查和分享,因为它可能会帮助其他人。我的用例是链接两个 Mono
s 并确保第一个在第二个被占用之前完成。第二个包含一个阻塞调用,其结果用于使用 empty
或 error
响应来响应 Mono
。没有 defer
,无论第一个结果如何,我的阻塞调用都会执行。但是在使用 defer
时,阻塞调用仅在第一个 Mono
完成时执行。代码如下:
public static void main(String[] args) {
long cur = System.currentTimeMillis();
boolean succeed = true;
Mono<Integer> monoJust = Mono.create(consumer -> {
System.out.println("MonoJust inside " + (System.currentTimeMillis() - cur));
if (succeed) {
consumer.success(1);
} else {
consumer.error(new RuntimeException("aaa"));
}
});
Mono<String> monoJustStr = Mono.create(consumer -> {
System.out.println("MonoJustStr inside: " + (System.currentTimeMillis() - cur));
consumer.success("one");
});
System.out.println("##1##: Begin");
monoJust.then(evaluator() ? Mono.empty() : monoJustStr).subscribe(d -> System.out.println("##1##: "+d), e-> System.err.println(e));
System.out.println("##1##: Done: "+(System.currentTimeMillis() - cur));
System.out.println("\n\n\n##2##: Begin");
monoJust.then(Mono.defer(() -> evaluator() ? Mono.empty() : monoJustStr)).subscribe(d -> System.out.println("##2##: "+d), e-> System.err.println(e));
System.out.println("##2##: Done: " + (System.currentTimeMillis() - cur));
}
private static boolean evaluator() {
System.out.println("Inside Evaluator");
return false;
}
输出 succeed=true
- 观察 "Inside Evaluator" 和 "MonoJust inside"
##1##: Begin
Inside Evaluator
MonoJust inside 540
MonoJustStr inside: 542
##1##: one
##1##: Done: 542
##2##: Begin
MonoJust inside 544
Inside Evaluator
MonoJustStr inside: 544
##2##: one
##2##: Done: 544
下面是 succeed = false
的输出 - 注意没有调用评估器。
##1##: Begin
Inside Evaluator
MonoJust inside 565
java.lang.RuntimeException: aaa
##1##: Done: 567
##2##: Begin
MonoJust inside 569
java.lang.RuntimeException: aaa
##2##: Done: 569
初学者的简单答案:
当在 monoJust 变量上调用订阅时,它将打印一个随机整数三次。但是在 monoDefer 变量上调用订阅时,它每次都可以打印一个随机数。
Mono<Integer> justMono = Mono.just((new Random()).nextInt(10));
//this will print same random number thrice
for(int i=0;i<3;i++)
justMono.subscribe(x -> {System.out.println("Just Mono: " + x);});
Mono<Integer> deferMono = Mono.defer(() -> Mono.just((new Random()).nextInt(10)));
//this might print three different random numbers
for(int i=0;i<3;i++)
deferMono.subscribe(x -> {System.out.println("Defer Mono: " + x);});
在 Mono.just() 中,实例化仅在第一次订阅发生时发生一次。在 Mono.defer() 中,每次调用订阅时都会发生实例化。
更多参考请查看: https://www.youtube.com/watch?v=eupNfdKMFL4&t=381s 在 3:15 分钟