在 Java 中迭代结果直到达到 Project Reactor 的条件

Iterating on results up to a condition with Project Reactor in Java

我需要使用 Project Reactor 中的 Flux and/or Mono 迭代数据存储的结果,直到发现没有结果符合我的条件。我有一个解决方案,但它使用错误来控制流程。我真的觉得应该有一个更清洁的解决方案,但如果有的话,我找不到。有没有比这更好的方法来实现我想要的:

import reactor.core.publisher.Mono;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;

import static java.util.Arrays.asList;

class Scratch {
    public static void main(String[] args) {
        Mono<String> mono = Mono.fromSupplier(new AtomicInteger(0)::incrementAndGet)
                .map(count -> "FooBar " + count)
                .flatMap(string -> searchExisting(string)
                        .flatMap(exists -> Mono.<String>error(KeyExistsException::new))
                        .switchIfEmpty(Mono.just(string)))
                .retry(t -> t instanceof KeyExistsException);

        System.out.println("Result: " + mono.block());
    }

    private static Mono<String> searchExisting(String test) {
        System.out.println("In searchExisting: " + test);
        return Mono.fromCompletionStage(getCompletableFuture(test));
    }

    private static final List<String> available = asList(
            "FooBar 1", "FooBar 2", "FooBar 3", "FooBar 4", "FooBar 5"
    );

    private static CompletableFuture<String> getCompletableFuture(String test) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println("In CompletableFuture: " + test);

            if (available.contains(test)) {
                return test;
            }

            System.out.println("Found no entry");
            return null;
        });
    }
}

class KeyExistsException extends RuntimeException {

}

我试过使用 Flux.fromStream(Stream.iterate(1, x -> x + 1)) 并从那里开始,但在这种情况下,我仍然需要基于错误的流量控制,我无法控制 Flux 发出数据,这意味着当我只查找一些数据存储时,我获得了很多对数据存储的访问,直到我找到缺失值。

这感觉像是一个足够常见的用例。我错过了什么?

--- 编辑 ---

根据 Bartosz 的建议,替代实现如下所示:

Mono<String> mono = Flux.fromStream(Stream.iterate(1, x -> x + 1))
                .map(count -> "FooBar " + count)
                .flatMap(newKey -> Mono.zip(Mono.just(newKey), searchExisting(newKey)))
                .takeUntil(tuple -> !tuple.getT2().isPresent())
                .map(Tuple2::getT1)
                .last();

很多更好,除了它不提供对Flux发出的事件数量的任何控制 - 你得到很多次尝试找到正确的值,远远超过你的需要,因为takeUntil不会让Flux变得懒惰...

如果您愿意使用 Optionals,您可以结合您的尝试并做类似

的事情
    Mono<String> mono =  Mono.fromSupplier(new AtomicInteger(0)::incrementAndGet)
            .map(count -> "FooBar " + count)
            .flatMap(newKey -> Mono.zip(Mono.just(newKey), 
                                           searchExisting(newKey)
                                               .map(Optional::ofNullable)
                                               .defaultIfEmpty(Optional.empty())))
            .repeat()
            .takeUntil(tuple -> tuple.getT2().isEmpty())
            .map(Tuple2::getT1)
            .last();

    System.out.println("Result: " + mono.block());

你正在尝试做的事情的主要问题是空 Monos 被几乎所有东西都忽略了 bar switchIfEmpty() 意思是 none 运算符,例如 [=14] =] 甚至会看到他们。

通常这是您想要的,但在您的情况下,您想要对空的 Mono 进行操作。 将它包装成 Optional 允许它仍然被处理。