如何根据流中的元素动态划分连续的 RXJava 流?

How can I divide a continuous RXJava stream on the fly based on the elements in the stream?

最简单的示例是这样的字符串流:

["3", "a", "b", "c", "1", "a", "2", "a", "b"]

那些是数字的描述了它的组应该包含多少个元素。

非常重要的是流是连续的,所以我们不能等待下一个数字来拆分流。

据我所知,在 RXJava2 中没有针对此的内置功能

var flowable = Flowable.concat(Flowable.fromArray("3", "a", "b", "c", "1", "a", "2", "a", "b"), Flowable.never());

flowable/*Something here*/.blockingSubscribe(System.out::println);

预期输出为:

[3, a, b, c]
[1, a]
[2, a, b]

我后来找到了 akarnokd 的 RxJava2Extensions 包。使用它,我能够构建它,它可以满足我的要求:


var flowable = Flowable.concat(Flowable.fromArray("3", "a", "b", "c", "1", "a", "2", "a", "b"), Flowable.never());
flowable.compose(FlowableTransformers.bufferUntil(new Predicate<>() {
    private int remaining = 0;
    @Override
    public boolean test(String next) {
        if(next.chars().allMatch(Character::isDigit)) {
            remaining = Integer.parseInt(next);
        }
        return --remaining < 0;
    }
})).blockingSubscribe(System.out::println);