将多个反应性 Publisher/Flux/Mono 对象组合成一个阻塞请求

Composing multiple reactive Publisher/Flux/Mono objects into a blocking request

我对反应世界还很陌生,很难理解如何完成任务。我正在处理一个遗留项目,我必须在其中实现一个接口,该接口具有许多方法来从 redis 查询各种对象。有时查询就像按 ID 查询哈希一样简单,因此只需调用 redis 一次即可获取哈希。其他时候,我可能需要先根据一些参数从 Redis 集中查找 ID,然后使用结果 ID 获取哈希值。我在 Spring 引导应用程序中使用 Reactor 3.1.0.M3 和 Lettuce 5.0.0.RC1。

这两个示例方法的现有代码如下所示:

    public <T extends CatalogInfo> T get(String id, Class<T> clazz) {
        String result = (String)repository.getRedisHashRepository().getHashById(CatalogUtils.root(clazz).getSimpleName(), id);
        if (null != result) {
            return serializer.fromData(result, clazz);
        }
        return null;

    }

    public <T extends CatalogInfo> T get(String attName, String attValue, Class<T> clazz) {

        String attKey = CatalogUtils.buildKey(CatalogUtils.root(clazz), attName, attValue);

        String id = CatalogUtils.getIdFromSet(repository.getRedisSetRepository().getSetMembers(attKey));
        if (id == null) {
            return null;
        }
        return get(id, clazz);
    }

那里有一些实用函数可以帮助我从 Class 名称构建我想用于 redis 的键,并确保存储在 redis 集中的 ID 是单个值。如您所见,在第二个 get 方法中,我使用集合中的结果调用第一个 get 方法。

用 Lettuce/Reactor 实现第一个方法很简单:

public <T extends CatalogInfo> Mono<String> getReactive(String id, Class<T> clazz, Publisher<String>... publisher) {
    Mono<String> mono = Mono.just(id).flatMap(new Function<String, Mono<String>>() {
        @Override
        public Mono<String> apply(String id) {
            return hashCommands.hget(CatalogUtils.root(clazz).getSimpleName(), id);
        }
    });
    return mono;
}

那时,我可以调用 mono.block() 来获取结果值。通过将两个 flatMaps/functions:

链接在一起,我可以很容易地获得第二个 get 方法的功能
public <T extends CatalogInfo> Flux<String> getIdFromSetReactive(String attName, String attValue, Class<T> clazz) {
    String attKey = CatalogUtils.buildKey(CatalogUtils.root(clazz), attName, attValue);

    Flux<String> flux = Flux.just(attKey).flatMap(new Function<String, Flux<String>>() {
        @Override
        public Flux<String> apply(String attKey) {
            return commands.smembers(attKey);
        }
    }).flatMap(new Function<String, Publisher<String>>() {
        @Override
        public Publisher<String> apply(String id) {
            return hashCommands.hget(CatalogUtils.root(clazz).getSimpleName(), id);
        }
    });
    return flux;
} 

我有许多不同类型的方法,可能需要多达 8 次调用 redis 才能完成。在我的原始代码中,我可以重复使用每个方法并从另一个方法调用一个方法,但我不知道如何使用 reactor 执行此操作。

我希望能够调用一个构建 Flux 的方法以从 redis 集合中获取 ID(我们称之为 fluxA),然后调用另一个构建 Flux 的方法以根据 ID 查询 redis 哈希(fluxB) 等

认为我可能需要将我可能需要的每个函数定义为成员变量,如下所示:

private Function<String, Flux<String>> getIdFromSetFunction = new Function<String, Flux<String>>() {
        @Override
        public Flux<String> apply(String attKey) {
            return commands.smembers(attKey);
        }
    }; 

然后打电话给

return Flux.just(attKey).flatMap(getIdFromSetFunction).flatMap(getHash);

唯一的问题是在这些函数中执行的代码需要 Class 当前在我的方法调用中可用的信息。但我不确定这是正确的方法。

如有任何建议,我们将不胜感激!

从概念上讲,您 "composing mono objects" 不如 "creating a new one for each step"。

Mono<String> a = Mono.just("something");
Mono<String> b = a.flatMap( s -> goDoSomethingElseThatReturnsAMono(s));

String result = b.block();

你可以随心所欲地保持这样的链接。 (或者如果您有多个要单独处理的数据项,请使用 Mono.flatMapMany 进入 Flux 世界)。

在调用 block 之前什么都不会发生,因为它会订阅 b 并阻塞当前线程,直到结果可用。