如何使用 RxJava 按值过滤一行中的事件序列

How to filter sequence of events in a row with RxJava by its value

我正在像这样使用 rxjava2 生成事件流。

Observable<Integer> observable = booleanPublisher.map(aBoolean -> {
        if(aBoolean){
            return 1;
        }else{
            return 0;
        }
    }).buffer(4).map( aLilst ->{
        int sum = 0;
        for (Integer i: aLilst) {
            sum +=i;
        }
        return sum;
    });
    observable.subscribe(
            aInt ->{
                Log.v("Value",String.valueOf(aInt));
            }
    );

这会产生一个值从 0..4 开始的 int 事件流,现在我想过滤这个流,例如在一行中的 2 个整数事件小于 4 时发出一个事件,而另一个事件是一行中有 10 个整数事件具有 4 个值。我试图分离 2 个不同的可观察对象并合并,但没有成功,因为这 2 个事件必须相互重置。

提前致谢。

实现它的一种方法是使用 scan 运算符。正如文档所说:

The Scan operator applies a function to the first item emitted by the source Observable and then emits the result of that function as its own first emission.

作为函数的返回值,您可以获得符合条件的行中的项目数。

我认为有一种方法可以在单个订阅中完成它,但我认为通过 publish() 运算符使用 ConnectableObservable 来拆分它更清楚。让我们举个例子:

Observable<Integer> getSource() {
    return Observable.just(1, 2, 3, 4, 4, 4, 4, 4, 2);
}

现在我们创建观察者来检查两个连续的小于 4 的项目。

Observable<Integer> source = getSource();

source.scan(0, (integer, integer2) -> {
    if (integer2 < 4) {
        return integer + 1;
    }
    return 0;
    }).filter(integer -> integer >= 2)
    .subscribe(integer -> logd("two items in a row, smaller than 4."));

而接收事件的订阅者为五个(为了简化)连续值等于 4。

source.scan(0, (integer, integer2) -> {
    if (integer2 == 4) {
        return integer + 1;
    }
    return 0;
    }).filter(integer -> integer >= 5) // for you it will be ten
    .subscribe(integer -> logd("five items in a row equals to 4."));

现在您只需拨打:

observable.connect();

注意:当条件匹配时,当前实现不会重置计数器。这意味着,例如,如果您正在寻找一个包含五个连续值且等于 4 的序列,则此序列:

4 4 4 4 4 4

将生成两个匹配项,因为第一个匹配项是序列的前五个项目,但是当您收到一个值为 4 的新项目时,您仍然有一个匹配项。

另一种避免使用两个不同订阅的方法是使用 merge,这样:

enum MatchResult{
    TWO_ITEM_SMALL,
    FIVE_CONSECUTIVE
}


Observable<MatchResult> getObservable1(Observable<Integer> observable) {
        return observable
                .scan(0, (integer, integer2) -> {
                    if (integer2 < 4) {
                        return integer + 1;
                    }
                    return 0;
                }).filter(integer -> integer >= 2)
                .map(integer -> MatchResult.TWO_ITEM_SMALL);
    }

Observable<MatchResult> getObservable2(Observable<Integer> observable) {
    return observable
            .scan(0, (integer, integer2) -> {
                if (integer2 == 4) {
                    return integer + 1;
                } else {
                    return 0;
                }
            }).filter(integer -> integer >= 5)
            .map(integer -> MatchResult.FIVE_CONSECUTIVE);
    }


Observable.merge(getObservable1(getSoruce()), getObservable2(getSoruce()))
            .subscribe(matchResult -> {
                if(matchResult == MatchResult.TWO_ITEM_SMALL) {
                    logd("two items in a row, smaller than 4.");
                } else {
                    logd("five items in a row equals to 4");
                }
            });