如何对 Flux 执行移动 window 计算并将结果输出为新的 Flux
How to perform a moving window calculation on a Flux and output the result as a new Flux
我想对 Flux 执行移动 window 计算并生成包含计算值的 Flux,但我不知道如何完成此操作。
作为一个简化的例子,假设我有一个整数 Flux,我想用这个 Flux 中每 3 个连续整数的总和生成一个新的 Flux。举例说明:
第一个通量包含从 1 到 8 的整数:{1, 2, 3, 4, 5, 6, 7, 8}
结果通量应包含总和:{1+2+3、2+3+4、3+4+5、4+5+6、5+6+7、6+7+8}
我可以很容易地生成第一个 Flux 并导出包含连续 3 个值的通量通量,如下所示:
Flux<Integer> f1 = Flux.range(1,8);
Flux<Flux<Integer>> f2 = f1.window(3,1);
我也可以订阅 () 到 f2 并计算总和,但我不知道如何同时将这些总和发布为新的 Flux。
我是漏掉了一些简单的东西,还是这种事情真的很难做?
您可以对内部通量使用 .reduce(Integer::sum)
对 window 中的元素求和,对外部通量使用 .flatMap
将这些总和合并回单个流。
请注意,由于 .window
是用 maxSize < skip
调用的,因此尾随 windows 中的项目将小于最大大小。
Flux<Integer> sums = Flux.range(1, 8) // Flux<Integer>
.window(3, 1) // Flux<Flux<Integer>>
.flatMap(window -> window.reduce(Integer::sum)); // Flux<Integer>
StepVerifier.create(sums)
.expectNext(6) // 1+2+3
.expectNext(9) // 2+3+4
.expectNext(12) // 3+4+5
.expectNext(15) // 4+5+6
.expectNext(18) // 5+6+7
.expectNext(21) // 6+7+8
.expectNext(15) // 7+8
.expectNext(8) // 8
.verifyComplete();
我想对 Flux 执行移动 window 计算并生成包含计算值的 Flux,但我不知道如何完成此操作。
作为一个简化的例子,假设我有一个整数 Flux,我想用这个 Flux 中每 3 个连续整数的总和生成一个新的 Flux。举例说明:
第一个通量包含从 1 到 8 的整数:{1, 2, 3, 4, 5, 6, 7, 8}
结果通量应包含总和:{1+2+3、2+3+4、3+4+5、4+5+6、5+6+7、6+7+8}
我可以很容易地生成第一个 Flux 并导出包含连续 3 个值的通量通量,如下所示:
Flux<Integer> f1 = Flux.range(1,8);
Flux<Flux<Integer>> f2 = f1.window(3,1);
我也可以订阅 () 到 f2 并计算总和,但我不知道如何同时将这些总和发布为新的 Flux。
我是漏掉了一些简单的东西,还是这种事情真的很难做?
您可以对内部通量使用 .reduce(Integer::sum)
对 window 中的元素求和,对外部通量使用 .flatMap
将这些总和合并回单个流。
请注意,由于 .window
是用 maxSize < skip
调用的,因此尾随 windows 中的项目将小于最大大小。
Flux<Integer> sums = Flux.range(1, 8) // Flux<Integer>
.window(3, 1) // Flux<Flux<Integer>>
.flatMap(window -> window.reduce(Integer::sum)); // Flux<Integer>
StepVerifier.create(sums)
.expectNext(6) // 1+2+3
.expectNext(9) // 2+3+4
.expectNext(12) // 3+4+5
.expectNext(15) // 4+5+6
.expectNext(18) // 5+6+7
.expectNext(21) // 6+7+8
.expectNext(15) // 7+8
.expectNext(8) // 8
.verifyComplete();