Reactor 中 groupBy 运算符的替代方案

Alternative to groupBy operator in Reactor

这是 this question 的后续问题。答案中建议的解决方案是使用 groupBy 运算符。这通常没问题,但正如其文档中所述,不建议使用大量不同的键,比如数万个。

data
  .groupBy(Data::getPointID)
  .flatMap(sameIdFlux -> sameIdFlux
    .concatMap(processor::process)
  )
  .subscribe();

每一组都有无限的自然元素,随时可能到来。我还需要限制并发处理的组数。据我了解,如果我使用上面的代码,要么我会达到开放组的隐式限制并且新组不会被打开(处理),要么 Ï 最终会达到内存不足,因为甚至很长时间不活动的组不会被关闭(想想删除的实体),因此会白白消耗一些内存开销。

是否可以使用某些运算符/模式来实现相同的行为,而无需 运行 解决上述问题?我最初尝试用一些合理的 Duration 来关闭每个组,但是当一个组关闭并且相同的 ID 到达时,我对竞争条件持开放态度,因此它们将被并行处理,这是不希望的。

编辑:我正在调查更多并尝试更多方法,目前我最大的问题似乎是如何正确管理背压/正确限制最大并发性而不限制组本身的数量。数据生成通常是线性的,但有时会产生较大的尖峰,我需要相应地加以限制。

我是 spring-fluxproject-reactor 领域的新手,所以我不知道有任何开箱即用的模式可以解决您的问题。但是,您可以创建自己的模式来限制使用 groupBy 运算符创建的组数。

在下面的示例中,我使用了受 this blog post of Apache Flink 启发的 int partition = i % numberOfPartitions; 模式来决定拆分流的分区数。

    public Flux<GroupedFlux<Integer, Data>> createFluxUsingGroupBy(List<String> dataList, int numberOfPartitions, int maxCount) {
        return Flux
                .fromStream(IntStream.range(0, maxCount)
                        .mapToObj(i -> {
                            int randomPosition = ThreadLocalRandom.current().nextInt(0, dataList.size());
                            int partition = i % numberOfPartitions;
                            return new Data(i, dataList.get(randomPosition), partition);
                        })
                )
                .delayElements(Duration.ofMillis(10))
                .log()
                .groupBy(Data::getPartition);
    }
........
@lombok.Data
@AllArgsConstructor
@NoArgsConstructor
public class Data {
    private Integer key;
    private String value;
    private Integer partition;
}

当我使用 numberOfPartitions = 3 执行它时,无论我使用的密钥是什么,我都会有 0 到 2 个分区(3 个分区)。

    @Test
    void testFluxUsingGroupBy() {
        int numberOfPartitions = 3;
        int maxCount = 100;
        Flux<GroupedFlux<Integer, Data>> dataGroupedFlux = fluxAndMonoTransformations.createFluxUsingGroupBy(expect, numberOfPartitions, maxCount);
        StepVerifier.create(dataGroupedFlux)
                .expectNextCount(numberOfPartitions)
                .verifyComplete();
    }

这是日志:

10:43:02.168 [Test worker] INFO reactor.Flux.ConcatMap.1 - onSubscribe(FluxConcatMap.ConcatMapImmediate)
10:43:02.179 [Test worker] INFO reactor.Flux.ConcatMap.1 - request(256)
10:43:02.291 [parallel-1] INFO reactor.Flux.ConcatMap.1 - onNext(Data(key=0, value=Spring, partition=0))
10:43:02.362 [parallel-1] INFO reactor.Flux.ConcatMap.1 - request(1)
10:43:02.375 [parallel-2] INFO reactor.Flux.ConcatMap.1 - onNext(Data(key=1, value=Scala, partition=1))
10:43:02.377 [parallel-2] INFO reactor.Flux.ConcatMap.1 - request(1)
10:43:02.388 [parallel-3] INFO reactor.Flux.ConcatMap.1 - onNext(Data(key=2, value=reactive programming, partition=2))
10:43:02.389 [parallel-3] INFO reactor.Flux.ConcatMap.1 - request(1)
10:43:02.400 [parallel-4] INFO reactor.Flux.ConcatMap.1 - onNext(Data(key=3, value=java with lambda, partition=0))
10:43:02.411 [parallel-1] INFO reactor.Flux.ConcatMap.1 - onNext(Data(key=4, value=Spring, partition=1))
10:43:02.422 [parallel-2] INFO reactor.Flux.ConcatMap.1 - onNext(Data(key=5, value=java 8, partition=2))
10:43:02.433 [parallel-3] INFO reactor.Flux.ConcatMap.1 - onNext(Data(key=6, value=java with lambda, partition=0))
10:43:02.444 [parallel-4] INFO reactor.Flux.ConcatMap.1 - onNext(Data(key=7, value=java with lambda, partition=1))
...

为了在 Data 对象上没有 private Integer key; 可用的情况下增强此解决方案,我可以基于哈希生成分区。我使用了另一个参数,即 parallelism。如果您使用 X 的并行度将值保存在存储上,并且之后读取相同的值但使用不同的并行度 != X ,则它基本上用于恢复操作,您可以将这些值保存在相同的位置团体。所以我使用了 int partition = (getDifferentHashCode(value) * parallelism) % numberOfPartitions; ,这也是受到我提到的博客 post 的启发。我更喜欢这种方法。

    public Flux<GroupedFlux<Integer, Data>> createFluxUsingHashGroupBy(List<String> dataList, int numberOfPartitions, int parallelism, int maxCount) {
        return Flux
                .fromStream(IntStream.range(0, maxCount)
                        .mapToObj(i -> {
                            int randomPosition = ThreadLocalRandom.current().nextInt(0, dataList.size());
                            String value = dataList.get(randomPosition);
                            int partition = (getDifferentHashCode(value) * parallelism) % numberOfPartitions;
                            return new Data(i, value, partition);
                        })
                )
                .delayElements(Duration.ofMillis(10))
                .log()
                .groupBy(Data::getPartition);
    }

    public int getDifferentHashCode(String value) {
        int hash = 7;
        for (int i = 0; i < value.length(); i++) {
            hash = hash * 31 + value.charAt(i);
        }
        return hash;
    }

单元测试:

    @Test
    void testFluxUsingHashGroupBy() {
        int numberOfPartitions = 3;
        int parallelism = 2;
        int maxCount = 100;
        Flux<GroupedFlux<Integer, Data>> dataGroupedFlux = fluxAndMonoTransformations.createFluxUsingHashGroupBy(expect, numberOfPartitions, parallelism, maxCount);
        StepVerifier.create(dataGroupedFlux)
                .expectNextCount(numberOfPartitions)
                .verifyComplete();
    }

关于背压问题,我认为它可以出现在另一个 SO 问题中。