使用带有动态参数的 Mono 方法
Using Mono methods with dynamic parameters
我有一个函数 return 输入为单声道:
public static Mono<Integer> emitter(int param){
return Mono.just(param)
.delayElement(Duration.ofMillis(100)); //delay to simulate http response
}
我想用初始值 3 调用发射器一次,然后重复调用直到达到特定大小。这个重复逻辑应该在 main 方法中,所以我不能修改 emitter()
.
public static void main(String[] args){
int maxSize = 5;
int initial = 3;
Mono<Integer> response = emitter(initial);
response
.doOnNext(s -> {
System.out.println("need more!");
})
.subscribe();
}
一个天真的解决方案是:
public static void main(String[] args){
int maxSize = 5;
int initial = 3;
for(int i = 0; i < 999; i++) {
Mono<Integer> response = emitter(initial+i);
Mono<Boolean> isDone = response
.flatMap(elem -> {
if(elem < maxSize) {
System.out.println("need more!");
return Mono.just(false);
} else {
System.out.println("ok done!");
return Mono.just(true);
}
});
if(isDone.block())
break;
}
}
基本上,我正在尝试根据前一个 Mono 的结果创建另一个具有动态参数的 Mono。我知道 Mono/Flux 是不可变的......
有没有一种简洁而反应性的方式来做到这一点?
我试过 Flux.range(0, Integer.MAX_VALUE).zipWith(myMono)
之类的东西来尝试将参数输入发射器,但无法使其工作。
PS。我知道我的例子没有多大意义。我试图简化涉及列表和 spring WebFlux(emitter).
的现实世界场景
谢谢!
---编辑
好的,这是我想出的:
public static void main(String[] args) throws InterruptedException {
int maxSize = 5;
int initial = 3;
Flux.range(initial, 10)
.delayElements(Duration.ofSeconds(1))
.flatMap(param -> emitter(param))
.flatMap(it -> {
if(it < maxSize) {
System.out.println("need more!: " + it);
return Mono.just(false);
} else {
System.out.println("done!: " + it);
return Mono.just(true);
}
})
.takeUntil(Boolean::booleanValue)
.subscribe();
Thread.sleep(6000);
}
need more!: 3
need more!: 4
done!: 5
一个问题是,如果我不延迟Flux.range
,执行不按顺序执行,输出的打印语句可能多于或少于预期的3行。
你可以使用Publisher的expand
函数,它的作用类似于递归
例如
emitter(initial)
.expand(i -> i < maxSize ? emitter(i + 1) : Mono.empty())
.doOnNext(i -> System.out.println("i = " + i))
.subscribe();
我有一个函数 return 输入为单声道:
public static Mono<Integer> emitter(int param){
return Mono.just(param)
.delayElement(Duration.ofMillis(100)); //delay to simulate http response
}
我想用初始值 3 调用发射器一次,然后重复调用直到达到特定大小。这个重复逻辑应该在 main 方法中,所以我不能修改 emitter()
.
public static void main(String[] args){
int maxSize = 5;
int initial = 3;
Mono<Integer> response = emitter(initial);
response
.doOnNext(s -> {
System.out.println("need more!");
})
.subscribe();
}
一个天真的解决方案是:
public static void main(String[] args){
int maxSize = 5;
int initial = 3;
for(int i = 0; i < 999; i++) {
Mono<Integer> response = emitter(initial+i);
Mono<Boolean> isDone = response
.flatMap(elem -> {
if(elem < maxSize) {
System.out.println("need more!");
return Mono.just(false);
} else {
System.out.println("ok done!");
return Mono.just(true);
}
});
if(isDone.block())
break;
}
}
基本上,我正在尝试根据前一个 Mono 的结果创建另一个具有动态参数的 Mono。我知道 Mono/Flux 是不可变的......
有没有一种简洁而反应性的方式来做到这一点?
我试过 Flux.range(0, Integer.MAX_VALUE).zipWith(myMono)
之类的东西来尝试将参数输入发射器,但无法使其工作。
PS。我知道我的例子没有多大意义。我试图简化涉及列表和 spring WebFlux(emitter).
的现实世界场景谢谢!
---编辑
好的,这是我想出的:
public static void main(String[] args) throws InterruptedException {
int maxSize = 5;
int initial = 3;
Flux.range(initial, 10)
.delayElements(Duration.ofSeconds(1))
.flatMap(param -> emitter(param))
.flatMap(it -> {
if(it < maxSize) {
System.out.println("need more!: " + it);
return Mono.just(false);
} else {
System.out.println("done!: " + it);
return Mono.just(true);
}
})
.takeUntil(Boolean::booleanValue)
.subscribe();
Thread.sleep(6000);
}
need more!: 3 need more!: 4 done!: 5
一个问题是,如果我不延迟Flux.range
,执行不按顺序执行,输出的打印语句可能多于或少于预期的3行。
你可以使用Publisher的expand
函数,它的作用类似于递归
例如
emitter(initial)
.expand(i -> i < maxSize ? emitter(i + 1) : Mono.empty())
.doOnNext(i -> System.out.println("i = " + i))
.subscribe();