将多个反应性 Publisher/Flux/Mono 对象组合成一个阻塞请求
Composing multiple reactive Publisher/Flux/Mono objects into a blocking request
我对反应世界还很陌生,很难理解如何完成任务。我正在处理一个遗留项目,我必须在其中实现一个接口,该接口具有许多方法来从 redis 查询各种对象。有时查询就像按 ID 查询哈希一样简单,因此只需调用 redis 一次即可获取哈希。其他时候,我可能需要先根据一些参数从 Redis 集中查找 ID,然后使用结果 ID 获取哈希值。我在 Spring 引导应用程序中使用 Reactor 3.1.0.M3 和 Lettuce 5.0.0.RC1。
这两个示例方法的现有代码如下所示:
public <T extends CatalogInfo> T get(String id, Class<T> clazz) {
String result = (String)repository.getRedisHashRepository().getHashById(CatalogUtils.root(clazz).getSimpleName(), id);
if (null != result) {
return serializer.fromData(result, clazz);
}
return null;
}
public <T extends CatalogInfo> T get(String attName, String attValue, Class<T> clazz) {
String attKey = CatalogUtils.buildKey(CatalogUtils.root(clazz), attName, attValue);
String id = CatalogUtils.getIdFromSet(repository.getRedisSetRepository().getSetMembers(attKey));
if (id == null) {
return null;
}
return get(id, clazz);
}
那里有一些实用函数可以帮助我从 Class 名称构建我想用于 redis 的键,并确保存储在 redis 集中的 ID 是单个值。如您所见,在第二个 get 方法中,我使用集合中的结果调用第一个 get 方法。
用 Lettuce/Reactor 实现第一个方法很简单:
public <T extends CatalogInfo> Mono<String> getReactive(String id, Class<T> clazz, Publisher<String>... publisher) {
Mono<String> mono = Mono.just(id).flatMap(new Function<String, Mono<String>>() {
@Override
public Mono<String> apply(String id) {
return hashCommands.hget(CatalogUtils.root(clazz).getSimpleName(), id);
}
});
return mono;
}
那时,我可以调用 mono.block() 来获取结果值。通过将两个 flatMaps/functions:
链接在一起,我可以很容易地获得第二个 get 方法的功能
public <T extends CatalogInfo> Flux<String> getIdFromSetReactive(String attName, String attValue, Class<T> clazz) {
String attKey = CatalogUtils.buildKey(CatalogUtils.root(clazz), attName, attValue);
Flux<String> flux = Flux.just(attKey).flatMap(new Function<String, Flux<String>>() {
@Override
public Flux<String> apply(String attKey) {
return commands.smembers(attKey);
}
}).flatMap(new Function<String, Publisher<String>>() {
@Override
public Publisher<String> apply(String id) {
return hashCommands.hget(CatalogUtils.root(clazz).getSimpleName(), id);
}
});
return flux;
}
我有许多不同类型的方法,可能需要多达 8 次调用 redis 才能完成。在我的原始代码中,我可以重复使用每个方法并从另一个方法调用一个方法,但我不知道如何使用 reactor 执行此操作。
我希望能够调用一个构建 Flux 的方法以从 redis 集合中获取 ID(我们称之为 fluxA),然后调用另一个构建 Flux 的方法以根据 ID 查询 redis 哈希(fluxB) 等
我认为我可能需要将我可能需要的每个函数定义为成员变量,如下所示:
private Function<String, Flux<String>> getIdFromSetFunction = new Function<String, Flux<String>>() {
@Override
public Flux<String> apply(String attKey) {
return commands.smembers(attKey);
}
};
然后打电话给
return Flux.just(attKey).flatMap(getIdFromSetFunction).flatMap(getHash);
唯一的问题是在这些函数中执行的代码需要 Class 当前在我的方法调用中可用的信息。但我不确定这是正确的方法。
如有任何建议,我们将不胜感激!
从概念上讲,您 "composing mono objects" 不如 "creating a new one for each step"。
Mono<String> a = Mono.just("something");
Mono<String> b = a.flatMap( s -> goDoSomethingElseThatReturnsAMono(s));
String result = b.block();
你可以随心所欲地保持这样的链接。 (或者如果您有多个要单独处理的数据项,请使用 Mono.flatMapMany
进入 Flux
世界)。
在调用 block
之前什么都不会发生,因为它会订阅 b
并阻塞当前线程,直到结果可用。
我对反应世界还很陌生,很难理解如何完成任务。我正在处理一个遗留项目,我必须在其中实现一个接口,该接口具有许多方法来从 redis 查询各种对象。有时查询就像按 ID 查询哈希一样简单,因此只需调用 redis 一次即可获取哈希。其他时候,我可能需要先根据一些参数从 Redis 集中查找 ID,然后使用结果 ID 获取哈希值。我在 Spring 引导应用程序中使用 Reactor 3.1.0.M3 和 Lettuce 5.0.0.RC1。
这两个示例方法的现有代码如下所示:
public <T extends CatalogInfo> T get(String id, Class<T> clazz) {
String result = (String)repository.getRedisHashRepository().getHashById(CatalogUtils.root(clazz).getSimpleName(), id);
if (null != result) {
return serializer.fromData(result, clazz);
}
return null;
}
public <T extends CatalogInfo> T get(String attName, String attValue, Class<T> clazz) {
String attKey = CatalogUtils.buildKey(CatalogUtils.root(clazz), attName, attValue);
String id = CatalogUtils.getIdFromSet(repository.getRedisSetRepository().getSetMembers(attKey));
if (id == null) {
return null;
}
return get(id, clazz);
}
那里有一些实用函数可以帮助我从 Class 名称构建我想用于 redis 的键,并确保存储在 redis 集中的 ID 是单个值。如您所见,在第二个 get 方法中,我使用集合中的结果调用第一个 get 方法。
用 Lettuce/Reactor 实现第一个方法很简单:
public <T extends CatalogInfo> Mono<String> getReactive(String id, Class<T> clazz, Publisher<String>... publisher) {
Mono<String> mono = Mono.just(id).flatMap(new Function<String, Mono<String>>() {
@Override
public Mono<String> apply(String id) {
return hashCommands.hget(CatalogUtils.root(clazz).getSimpleName(), id);
}
});
return mono;
}
那时,我可以调用 mono.block() 来获取结果值。通过将两个 flatMaps/functions:
链接在一起,我可以很容易地获得第二个 get 方法的功能public <T extends CatalogInfo> Flux<String> getIdFromSetReactive(String attName, String attValue, Class<T> clazz) {
String attKey = CatalogUtils.buildKey(CatalogUtils.root(clazz), attName, attValue);
Flux<String> flux = Flux.just(attKey).flatMap(new Function<String, Flux<String>>() {
@Override
public Flux<String> apply(String attKey) {
return commands.smembers(attKey);
}
}).flatMap(new Function<String, Publisher<String>>() {
@Override
public Publisher<String> apply(String id) {
return hashCommands.hget(CatalogUtils.root(clazz).getSimpleName(), id);
}
});
return flux;
}
我有许多不同类型的方法,可能需要多达 8 次调用 redis 才能完成。在我的原始代码中,我可以重复使用每个方法并从另一个方法调用一个方法,但我不知道如何使用 reactor 执行此操作。
我希望能够调用一个构建 Flux 的方法以从 redis 集合中获取 ID(我们称之为 fluxA),然后调用另一个构建 Flux 的方法以根据 ID 查询 redis 哈希(fluxB) 等
我认为我可能需要将我可能需要的每个函数定义为成员变量,如下所示:
private Function<String, Flux<String>> getIdFromSetFunction = new Function<String, Flux<String>>() {
@Override
public Flux<String> apply(String attKey) {
return commands.smembers(attKey);
}
};
然后打电话给
return Flux.just(attKey).flatMap(getIdFromSetFunction).flatMap(getHash);
唯一的问题是在这些函数中执行的代码需要 Class 当前在我的方法调用中可用的信息。但我不确定这是正确的方法。
如有任何建议,我们将不胜感激!
从概念上讲,您 "composing mono objects" 不如 "creating a new one for each step"。
Mono<String> a = Mono.just("something");
Mono<String> b = a.flatMap( s -> goDoSomethingElseThatReturnsAMono(s));
String result = b.block();
你可以随心所欲地保持这样的链接。 (或者如果您有多个要单独处理的数据项,请使用 Mono.flatMapMany
进入 Flux
世界)。
在调用 block
之前什么都不会发生,因为它会订阅 b
并阻塞当前线程,直到结果可用。