在 Java 中迭代结果直到达到 Project Reactor 的条件
Iterating on results up to a condition with Project Reactor in Java
我需要使用 Project Reactor 中的 Flux and/or Mono 迭代数据存储的结果,直到发现没有结果符合我的条件。我有一个解决方案,但它使用错误来控制流程。我真的觉得应该有一个更清洁的解决方案,但如果有的话,我找不到。有没有比这更好的方法来实现我想要的:
import reactor.core.publisher.Mono;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import static java.util.Arrays.asList;
class Scratch {
public static void main(String[] args) {
Mono<String> mono = Mono.fromSupplier(new AtomicInteger(0)::incrementAndGet)
.map(count -> "FooBar " + count)
.flatMap(string -> searchExisting(string)
.flatMap(exists -> Mono.<String>error(KeyExistsException::new))
.switchIfEmpty(Mono.just(string)))
.retry(t -> t instanceof KeyExistsException);
System.out.println("Result: " + mono.block());
}
private static Mono<String> searchExisting(String test) {
System.out.println("In searchExisting: " + test);
return Mono.fromCompletionStage(getCompletableFuture(test));
}
private static final List<String> available = asList(
"FooBar 1", "FooBar 2", "FooBar 3", "FooBar 4", "FooBar 5"
);
private static CompletableFuture<String> getCompletableFuture(String test) {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("In CompletableFuture: " + test);
if (available.contains(test)) {
return test;
}
System.out.println("Found no entry");
return null;
});
}
}
class KeyExistsException extends RuntimeException {
}
我试过使用 Flux.fromStream(Stream.iterate(1, x -> x + 1))
并从那里开始,但在这种情况下,我仍然需要基于错误的流量控制,我无法控制 Flux
发出数据,这意味着当我只查找一些数据存储时,我获得了很多对数据存储的访问,直到我找到缺失值。
这感觉像是一个足够常见的用例。我错过了什么?
--- 编辑 ---
根据 Bartosz 的建议,替代实现如下所示:
Mono<String> mono = Flux.fromStream(Stream.iterate(1, x -> x + 1))
.map(count -> "FooBar " + count)
.flatMap(newKey -> Mono.zip(Mono.just(newKey), searchExisting(newKey)))
.takeUntil(tuple -> !tuple.getT2().isPresent())
.map(Tuple2::getT1)
.last();
很多更好,除了它不提供对Flux
发出的事件数量的任何控制 - 你得到很多次尝试找到正确的值,远远超过你的需要,因为takeUntil
不会让Flux
变得懒惰...
如果您愿意使用 Optional
s,您可以结合您的尝试并做类似
的事情
Mono<String> mono = Mono.fromSupplier(new AtomicInteger(0)::incrementAndGet)
.map(count -> "FooBar " + count)
.flatMap(newKey -> Mono.zip(Mono.just(newKey),
searchExisting(newKey)
.map(Optional::ofNullable)
.defaultIfEmpty(Optional.empty())))
.repeat()
.takeUntil(tuple -> tuple.getT2().isEmpty())
.map(Tuple2::getT1)
.last();
System.out.println("Result: " + mono.block());
你正在尝试做的事情的主要问题是空 Mono
s 被几乎所有东西都忽略了 bar switchIfEmpty()
意思是 none 运算符,例如 [=14] =] 甚至会看到他们。
通常这是您想要的,但在您的情况下,您想要对空的 Mono 进行操作。
将它包装成 Optional
允许它仍然被处理。
我需要使用 Project Reactor 中的 Flux and/or Mono 迭代数据存储的结果,直到发现没有结果符合我的条件。我有一个解决方案,但它使用错误来控制流程。我真的觉得应该有一个更清洁的解决方案,但如果有的话,我找不到。有没有比这更好的方法来实现我想要的:
import reactor.core.publisher.Mono;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import static java.util.Arrays.asList;
class Scratch {
public static void main(String[] args) {
Mono<String> mono = Mono.fromSupplier(new AtomicInteger(0)::incrementAndGet)
.map(count -> "FooBar " + count)
.flatMap(string -> searchExisting(string)
.flatMap(exists -> Mono.<String>error(KeyExistsException::new))
.switchIfEmpty(Mono.just(string)))
.retry(t -> t instanceof KeyExistsException);
System.out.println("Result: " + mono.block());
}
private static Mono<String> searchExisting(String test) {
System.out.println("In searchExisting: " + test);
return Mono.fromCompletionStage(getCompletableFuture(test));
}
private static final List<String> available = asList(
"FooBar 1", "FooBar 2", "FooBar 3", "FooBar 4", "FooBar 5"
);
private static CompletableFuture<String> getCompletableFuture(String test) {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("In CompletableFuture: " + test);
if (available.contains(test)) {
return test;
}
System.out.println("Found no entry");
return null;
});
}
}
class KeyExistsException extends RuntimeException {
}
我试过使用 Flux.fromStream(Stream.iterate(1, x -> x + 1))
并从那里开始,但在这种情况下,我仍然需要基于错误的流量控制,我无法控制 Flux
发出数据,这意味着当我只查找一些数据存储时,我获得了很多对数据存储的访问,直到我找到缺失值。
这感觉像是一个足够常见的用例。我错过了什么?
--- 编辑 ---
根据 Bartosz 的建议,替代实现如下所示:
Mono<String> mono = Flux.fromStream(Stream.iterate(1, x -> x + 1))
.map(count -> "FooBar " + count)
.flatMap(newKey -> Mono.zip(Mono.just(newKey), searchExisting(newKey)))
.takeUntil(tuple -> !tuple.getT2().isPresent())
.map(Tuple2::getT1)
.last();
很多更好,除了它不提供对Flux
发出的事件数量的任何控制 - 你得到很多次尝试找到正确的值,远远超过你的需要,因为takeUntil
不会让Flux
变得懒惰...
如果您愿意使用 Optional
s,您可以结合您的尝试并做类似
Mono<String> mono = Mono.fromSupplier(new AtomicInteger(0)::incrementAndGet)
.map(count -> "FooBar " + count)
.flatMap(newKey -> Mono.zip(Mono.just(newKey),
searchExisting(newKey)
.map(Optional::ofNullable)
.defaultIfEmpty(Optional.empty())))
.repeat()
.takeUntil(tuple -> tuple.getT2().isEmpty())
.map(Tuple2::getT1)
.last();
System.out.println("Result: " + mono.block());
你正在尝试做的事情的主要问题是空 Mono
s 被几乎所有东西都忽略了 bar switchIfEmpty()
意思是 none 运算符,例如 [=14] =] 甚至会看到他们。
通常这是您想要的,但在您的情况下,您想要对空的 Mono 进行操作。
将它包装成 Optional
允许它仍然被处理。