如何根据流中的元素动态划分连续的 RXJava 流?
How can I divide a continuous RXJava stream on the fly based on the elements in the stream?
最简单的示例是这样的字符串流:
["3", "a", "b", "c", "1", "a", "2", "a", "b"]
那些是数字的描述了它的组应该包含多少个元素。
非常重要的是流是连续的,所以我们不能等待下一个数字来拆分流。
据我所知,在 RXJava2 中没有针对此的内置功能
var flowable = Flowable.concat(Flowable.fromArray("3", "a", "b", "c", "1", "a", "2", "a", "b"), Flowable.never());
flowable/*Something here*/.blockingSubscribe(System.out::println);
预期输出为:
[3, a, b, c]
[1, a]
[2, a, b]
我后来找到了 akarnokd 的 RxJava2Extensions 包。使用它,我能够构建它,它可以满足我的要求:
var flowable = Flowable.concat(Flowable.fromArray("3", "a", "b", "c", "1", "a", "2", "a", "b"), Flowable.never());
flowable.compose(FlowableTransformers.bufferUntil(new Predicate<>() {
private int remaining = 0;
@Override
public boolean test(String next) {
if(next.chars().allMatch(Character::isDigit)) {
remaining = Integer.parseInt(next);
}
return --remaining < 0;
}
})).blockingSubscribe(System.out::println);
最简单的示例是这样的字符串流:
["3", "a", "b", "c", "1", "a", "2", "a", "b"]
那些是数字的描述了它的组应该包含多少个元素。
非常重要的是流是连续的,所以我们不能等待下一个数字来拆分流。
据我所知,在 RXJava2 中没有针对此的内置功能
var flowable = Flowable.concat(Flowable.fromArray("3", "a", "b", "c", "1", "a", "2", "a", "b"), Flowable.never());
flowable/*Something here*/.blockingSubscribe(System.out::println);
预期输出为:
[3, a, b, c]
[1, a]
[2, a, b]
我后来找到了 akarnokd 的 RxJava2Extensions 包。使用它,我能够构建它,它可以满足我的要求:
var flowable = Flowable.concat(Flowable.fromArray("3", "a", "b", "c", "1", "a", "2", "a", "b"), Flowable.never());
flowable.compose(FlowableTransformers.bufferUntil(new Predicate<>() {
private int remaining = 0;
@Override
public boolean test(String next) {
if(next.chars().allMatch(Character::isDigit)) {
remaining = Integer.parseInt(next);
}
return --remaining < 0;
}
})).blockingSubscribe(System.out::println);