Reactor 中 groupBy 运算符的替代方案
Alternative to groupBy operator in Reactor
这是 this question 的后续问题。答案中建议的解决方案是使用 groupBy
运算符。这通常没问题,但正如其文档中所述,不建议使用大量不同的键,比如数万个。
data
.groupBy(Data::getPointID)
.flatMap(sameIdFlux -> sameIdFlux
.concatMap(processor::process)
)
.subscribe();
每一组都有无限的自然元素,随时可能到来。我还需要限制并发处理的组数。据我了解,如果我使用上面的代码,要么我会达到开放组的隐式限制并且新组不会被打开(处理),要么 Ï 最终会达到内存不足,因为甚至很长时间不活动的组不会被关闭(想想删除的实体),因此会白白消耗一些内存开销。
是否可以使用某些运算符/模式来实现相同的行为,而无需 运行 解决上述问题?我最初尝试用一些合理的 Duration
来关闭每个组,但是当一个组关闭并且相同的 ID 到达时,我对竞争条件持开放态度,因此它们将被并行处理,这是不希望的。
编辑:我正在调查更多并尝试更多方法,目前我最大的问题似乎是如何正确管理背压/正确限制最大并发性而不限制组本身的数量。数据生成通常是线性的,但有时会产生较大的尖峰,我需要相应地加以限制。
我是 spring-flux
和 project-reactor
领域的新手,所以我不知道有任何开箱即用的模式可以解决您的问题。但是,您可以创建自己的模式来限制使用 groupBy
运算符创建的组数。
在下面的示例中,我使用了受 this blog post of Apache Flink 启发的 int partition = i % numberOfPartitions;
模式来决定拆分流的分区数。
public Flux<GroupedFlux<Integer, Data>> createFluxUsingGroupBy(List<String> dataList, int numberOfPartitions, int maxCount) {
return Flux
.fromStream(IntStream.range(0, maxCount)
.mapToObj(i -> {
int randomPosition = ThreadLocalRandom.current().nextInt(0, dataList.size());
int partition = i % numberOfPartitions;
return new Data(i, dataList.get(randomPosition), partition);
})
)
.delayElements(Duration.ofMillis(10))
.log()
.groupBy(Data::getPartition);
}
........
@lombok.Data
@AllArgsConstructor
@NoArgsConstructor
public class Data {
private Integer key;
private String value;
private Integer partition;
}
当我使用 numberOfPartitions = 3
执行它时,无论我使用的密钥是什么,我都会有 0 到 2 个分区(3 个分区)。
@Test
void testFluxUsingGroupBy() {
int numberOfPartitions = 3;
int maxCount = 100;
Flux<GroupedFlux<Integer, Data>> dataGroupedFlux = fluxAndMonoTransformations.createFluxUsingGroupBy(expect, numberOfPartitions, maxCount);
StepVerifier.create(dataGroupedFlux)
.expectNextCount(numberOfPartitions)
.verifyComplete();
}
这是日志:
10:43:02.168 [Test worker] INFO reactor.Flux.ConcatMap.1 - onSubscribe(FluxConcatMap.ConcatMapImmediate)
10:43:02.179 [Test worker] INFO reactor.Flux.ConcatMap.1 - request(256)
10:43:02.291 [parallel-1] INFO reactor.Flux.ConcatMap.1 - onNext(Data(key=0, value=Spring, partition=0))
10:43:02.362 [parallel-1] INFO reactor.Flux.ConcatMap.1 - request(1)
10:43:02.375 [parallel-2] INFO reactor.Flux.ConcatMap.1 - onNext(Data(key=1, value=Scala, partition=1))
10:43:02.377 [parallel-2] INFO reactor.Flux.ConcatMap.1 - request(1)
10:43:02.388 [parallel-3] INFO reactor.Flux.ConcatMap.1 - onNext(Data(key=2, value=reactive programming, partition=2))
10:43:02.389 [parallel-3] INFO reactor.Flux.ConcatMap.1 - request(1)
10:43:02.400 [parallel-4] INFO reactor.Flux.ConcatMap.1 - onNext(Data(key=3, value=java with lambda, partition=0))
10:43:02.411 [parallel-1] INFO reactor.Flux.ConcatMap.1 - onNext(Data(key=4, value=Spring, partition=1))
10:43:02.422 [parallel-2] INFO reactor.Flux.ConcatMap.1 - onNext(Data(key=5, value=java 8, partition=2))
10:43:02.433 [parallel-3] INFO reactor.Flux.ConcatMap.1 - onNext(Data(key=6, value=java with lambda, partition=0))
10:43:02.444 [parallel-4] INFO reactor.Flux.ConcatMap.1 - onNext(Data(key=7, value=java with lambda, partition=1))
...
为了在 Data
对象上没有 private Integer key;
可用的情况下增强此解决方案,我可以基于哈希生成分区。我使用了另一个参数,即 parallelism
。如果您使用 X
的并行度将值保存在存储上,并且之后读取相同的值但使用不同的并行度 != X
,则它基本上用于恢复操作,您可以将这些值保存在相同的位置团体。所以我使用了 int partition = (getDifferentHashCode(value) * parallelism) % numberOfPartitions;
,这也是受到我提到的博客 post 的启发。我更喜欢这种方法。
public Flux<GroupedFlux<Integer, Data>> createFluxUsingHashGroupBy(List<String> dataList, int numberOfPartitions, int parallelism, int maxCount) {
return Flux
.fromStream(IntStream.range(0, maxCount)
.mapToObj(i -> {
int randomPosition = ThreadLocalRandom.current().nextInt(0, dataList.size());
String value = dataList.get(randomPosition);
int partition = (getDifferentHashCode(value) * parallelism) % numberOfPartitions;
return new Data(i, value, partition);
})
)
.delayElements(Duration.ofMillis(10))
.log()
.groupBy(Data::getPartition);
}
public int getDifferentHashCode(String value) {
int hash = 7;
for (int i = 0; i < value.length(); i++) {
hash = hash * 31 + value.charAt(i);
}
return hash;
}
单元测试:
@Test
void testFluxUsingHashGroupBy() {
int numberOfPartitions = 3;
int parallelism = 2;
int maxCount = 100;
Flux<GroupedFlux<Integer, Data>> dataGroupedFlux = fluxAndMonoTransformations.createFluxUsingHashGroupBy(expect, numberOfPartitions, parallelism, maxCount);
StepVerifier.create(dataGroupedFlux)
.expectNextCount(numberOfPartitions)
.verifyComplete();
}
关于背压问题,我认为它可以出现在另一个 SO 问题中。
这是 this question 的后续问题。答案中建议的解决方案是使用 groupBy
运算符。这通常没问题,但正如其文档中所述,不建议使用大量不同的键,比如数万个。
data
.groupBy(Data::getPointID)
.flatMap(sameIdFlux -> sameIdFlux
.concatMap(processor::process)
)
.subscribe();
每一组都有无限的自然元素,随时可能到来。我还需要限制并发处理的组数。据我了解,如果我使用上面的代码,要么我会达到开放组的隐式限制并且新组不会被打开(处理),要么 Ï 最终会达到内存不足,因为甚至很长时间不活动的组不会被关闭(想想删除的实体),因此会白白消耗一些内存开销。
是否可以使用某些运算符/模式来实现相同的行为,而无需 运行 解决上述问题?我最初尝试用一些合理的 Duration
来关闭每个组,但是当一个组关闭并且相同的 ID 到达时,我对竞争条件持开放态度,因此它们将被并行处理,这是不希望的。
编辑:我正在调查更多并尝试更多方法,目前我最大的问题似乎是如何正确管理背压/正确限制最大并发性而不限制组本身的数量。数据生成通常是线性的,但有时会产生较大的尖峰,我需要相应地加以限制。
我是 spring-flux
和 project-reactor
领域的新手,所以我不知道有任何开箱即用的模式可以解决您的问题。但是,您可以创建自己的模式来限制使用 groupBy
运算符创建的组数。
在下面的示例中,我使用了受 this blog post of Apache Flink 启发的 int partition = i % numberOfPartitions;
模式来决定拆分流的分区数。
public Flux<GroupedFlux<Integer, Data>> createFluxUsingGroupBy(List<String> dataList, int numberOfPartitions, int maxCount) {
return Flux
.fromStream(IntStream.range(0, maxCount)
.mapToObj(i -> {
int randomPosition = ThreadLocalRandom.current().nextInt(0, dataList.size());
int partition = i % numberOfPartitions;
return new Data(i, dataList.get(randomPosition), partition);
})
)
.delayElements(Duration.ofMillis(10))
.log()
.groupBy(Data::getPartition);
}
........
@lombok.Data
@AllArgsConstructor
@NoArgsConstructor
public class Data {
private Integer key;
private String value;
private Integer partition;
}
当我使用 numberOfPartitions = 3
执行它时,无论我使用的密钥是什么,我都会有 0 到 2 个分区(3 个分区)。
@Test
void testFluxUsingGroupBy() {
int numberOfPartitions = 3;
int maxCount = 100;
Flux<GroupedFlux<Integer, Data>> dataGroupedFlux = fluxAndMonoTransformations.createFluxUsingGroupBy(expect, numberOfPartitions, maxCount);
StepVerifier.create(dataGroupedFlux)
.expectNextCount(numberOfPartitions)
.verifyComplete();
}
这是日志:
10:43:02.168 [Test worker] INFO reactor.Flux.ConcatMap.1 - onSubscribe(FluxConcatMap.ConcatMapImmediate)
10:43:02.179 [Test worker] INFO reactor.Flux.ConcatMap.1 - request(256)
10:43:02.291 [parallel-1] INFO reactor.Flux.ConcatMap.1 - onNext(Data(key=0, value=Spring, partition=0))
10:43:02.362 [parallel-1] INFO reactor.Flux.ConcatMap.1 - request(1)
10:43:02.375 [parallel-2] INFO reactor.Flux.ConcatMap.1 - onNext(Data(key=1, value=Scala, partition=1))
10:43:02.377 [parallel-2] INFO reactor.Flux.ConcatMap.1 - request(1)
10:43:02.388 [parallel-3] INFO reactor.Flux.ConcatMap.1 - onNext(Data(key=2, value=reactive programming, partition=2))
10:43:02.389 [parallel-3] INFO reactor.Flux.ConcatMap.1 - request(1)
10:43:02.400 [parallel-4] INFO reactor.Flux.ConcatMap.1 - onNext(Data(key=3, value=java with lambda, partition=0))
10:43:02.411 [parallel-1] INFO reactor.Flux.ConcatMap.1 - onNext(Data(key=4, value=Spring, partition=1))
10:43:02.422 [parallel-2] INFO reactor.Flux.ConcatMap.1 - onNext(Data(key=5, value=java 8, partition=2))
10:43:02.433 [parallel-3] INFO reactor.Flux.ConcatMap.1 - onNext(Data(key=6, value=java with lambda, partition=0))
10:43:02.444 [parallel-4] INFO reactor.Flux.ConcatMap.1 - onNext(Data(key=7, value=java with lambda, partition=1))
...
为了在 Data
对象上没有 private Integer key;
可用的情况下增强此解决方案,我可以基于哈希生成分区。我使用了另一个参数,即 parallelism
。如果您使用 X
的并行度将值保存在存储上,并且之后读取相同的值但使用不同的并行度 != X
,则它基本上用于恢复操作,您可以将这些值保存在相同的位置团体。所以我使用了 int partition = (getDifferentHashCode(value) * parallelism) % numberOfPartitions;
,这也是受到我提到的博客 post 的启发。我更喜欢这种方法。
public Flux<GroupedFlux<Integer, Data>> createFluxUsingHashGroupBy(List<String> dataList, int numberOfPartitions, int parallelism, int maxCount) {
return Flux
.fromStream(IntStream.range(0, maxCount)
.mapToObj(i -> {
int randomPosition = ThreadLocalRandom.current().nextInt(0, dataList.size());
String value = dataList.get(randomPosition);
int partition = (getDifferentHashCode(value) * parallelism) % numberOfPartitions;
return new Data(i, value, partition);
})
)
.delayElements(Duration.ofMillis(10))
.log()
.groupBy(Data::getPartition);
}
public int getDifferentHashCode(String value) {
int hash = 7;
for (int i = 0; i < value.length(); i++) {
hash = hash * 31 + value.charAt(i);
}
return hash;
}
单元测试:
@Test
void testFluxUsingHashGroupBy() {
int numberOfPartitions = 3;
int parallelism = 2;
int maxCount = 100;
Flux<GroupedFlux<Integer, Data>> dataGroupedFlux = fluxAndMonoTransformations.createFluxUsingHashGroupBy(expect, numberOfPartitions, parallelism, maxCount);
StepVerifier.create(dataGroupedFlux)
.expectNextCount(numberOfPartitions)
.verifyComplete();
}
关于背压问题,我认为它可以出现在另一个 SO 问题中。