在 rxjava 中具有打开和关闭边界的缓冲区

Buffer with openning and clossing boundaries in rxjava

我正在尝试在 rxjava 中使用具有打开和关闭边界的缓冲区,但我无法让它工作。我想做的是假设一个 observable emits, 0, 1, 2, 3, 0 , 1, 3, 0, 3 我想以 {0, 1, 2, 3}, {0, 1 , 3}, {0, 3}.

这是我目前的代码:

PublishSubject openning = PublishSubject.create();
        openning.doOnNext(new Consumer() {
            @Override
            public void accept(@NonNull Object o) throws Exception {
                if(o.equals("0"))
                    openning.onNext(o);
            }
        });

        Observable<String> observableA = Observable.interval(1, TimeUnit.SECONDS).map(value -> String.valueOf(value  % 10));

        observableA.subscribe(openning);
        // TODO: Buffer by boundary
        observableA = observableA.buffer(openning, new Function<String, Observable<List<String>>>() {
            @Override
            public Observable<List<String>> apply(@NonNull String o) throws Exception {
                list.add(o);
                if (o.equals("0")) {
                    return Observable.just(list);
                } else {
                    list.add(o);
                    sb.append(o);
                    return Observable.never();
                }
            }
        }, new Callable() {
            @Override
            public Object call() throws Exception {
                return list;
            }
        });

任何帮助将不胜感激

如果你的意思是buffer应该在遇到3之后重新启动,看RxJava 2的extensions project里面的bufferUntil操作符

compile "com.github.akarnokd:rxjava2-extensions:0.17.5"

Flowable.just("1", "2", "#", "3", "#", "4", "#")
.compose(FlowableTransformers.bufferUntil(v -> "#".equals(v)))
.test()
.assertResult(
    Arrays.asList("1", "2", "#"),
    Arrays.asList("3", "#"),
    Arrays.asList("4", "#")
);

对于1.x,有rxjava-extras with Transformers.toListUntil()