如何使用 RxJava 按值过滤一行中的事件序列
How to filter sequence of events in a row with RxJava by its value
我正在像这样使用 rxjava2 生成事件流。
Observable<Integer> observable = booleanPublisher.map(aBoolean -> {
if(aBoolean){
return 1;
}else{
return 0;
}
}).buffer(4).map( aLilst ->{
int sum = 0;
for (Integer i: aLilst) {
sum +=i;
}
return sum;
});
observable.subscribe(
aInt ->{
Log.v("Value",String.valueOf(aInt));
}
);
这会产生一个值从 0..4 开始的 int 事件流,现在我想过滤这个流,例如在一行中的 2 个整数事件小于 4 时发出一个事件,而另一个事件是一行中有 10 个整数事件具有 4 个值。我试图分离 2 个不同的可观察对象并合并,但没有成功,因为这 2 个事件必须相互重置。
提前致谢。
实现它的一种方法是使用 scan
运算符。正如文档所说:
The Scan operator applies a function to the first item emitted by the
source Observable and then emits the result of that function as its
own first emission.
作为函数的返回值,您可以获得符合条件的行中的项目数。
我认为有一种方法可以在单个订阅中完成它,但我认为通过 publish()
运算符使用 ConnectableObservable
来拆分它更清楚。让我们举个例子:
Observable<Integer> getSource() {
return Observable.just(1, 2, 3, 4, 4, 4, 4, 4, 2);
}
现在我们创建观察者来检查两个连续的小于 4 的项目。
Observable<Integer> source = getSource();
source.scan(0, (integer, integer2) -> {
if (integer2 < 4) {
return integer + 1;
}
return 0;
}).filter(integer -> integer >= 2)
.subscribe(integer -> logd("two items in a row, smaller than 4."));
而接收事件的订阅者为五个(为了简化)连续值等于 4。
source.scan(0, (integer, integer2) -> {
if (integer2 == 4) {
return integer + 1;
}
return 0;
}).filter(integer -> integer >= 5) // for you it will be ten
.subscribe(integer -> logd("five items in a row equals to 4."));
现在您只需拨打:
observable.connect();
注意:当条件匹配时,当前实现不会重置计数器。这意味着,例如,如果您正在寻找一个包含五个连续值且等于 4 的序列,则此序列:
4 4 4 4 4 4
将生成两个匹配项,因为第一个匹配项是序列的前五个项目,但是当您收到一个值为 4 的新项目时,您仍然有一个匹配项。
另一种避免使用两个不同订阅的方法是使用 merge
,这样:
enum MatchResult{
TWO_ITEM_SMALL,
FIVE_CONSECUTIVE
}
Observable<MatchResult> getObservable1(Observable<Integer> observable) {
return observable
.scan(0, (integer, integer2) -> {
if (integer2 < 4) {
return integer + 1;
}
return 0;
}).filter(integer -> integer >= 2)
.map(integer -> MatchResult.TWO_ITEM_SMALL);
}
Observable<MatchResult> getObservable2(Observable<Integer> observable) {
return observable
.scan(0, (integer, integer2) -> {
if (integer2 == 4) {
return integer + 1;
} else {
return 0;
}
}).filter(integer -> integer >= 5)
.map(integer -> MatchResult.FIVE_CONSECUTIVE);
}
Observable.merge(getObservable1(getSoruce()), getObservable2(getSoruce()))
.subscribe(matchResult -> {
if(matchResult == MatchResult.TWO_ITEM_SMALL) {
logd("two items in a row, smaller than 4.");
} else {
logd("five items in a row equals to 4");
}
});
我正在像这样使用 rxjava2 生成事件流。
Observable<Integer> observable = booleanPublisher.map(aBoolean -> {
if(aBoolean){
return 1;
}else{
return 0;
}
}).buffer(4).map( aLilst ->{
int sum = 0;
for (Integer i: aLilst) {
sum +=i;
}
return sum;
});
observable.subscribe(
aInt ->{
Log.v("Value",String.valueOf(aInt));
}
);
这会产生一个值从 0..4 开始的 int 事件流,现在我想过滤这个流,例如在一行中的 2 个整数事件小于 4 时发出一个事件,而另一个事件是一行中有 10 个整数事件具有 4 个值。我试图分离 2 个不同的可观察对象并合并,但没有成功,因为这 2 个事件必须相互重置。
提前致谢。
实现它的一种方法是使用 scan
运算符。正如文档所说:
The Scan operator applies a function to the first item emitted by the source Observable and then emits the result of that function as its own first emission.
作为函数的返回值,您可以获得符合条件的行中的项目数。
我认为有一种方法可以在单个订阅中完成它,但我认为通过 publish()
运算符使用 ConnectableObservable
来拆分它更清楚。让我们举个例子:
Observable<Integer> getSource() {
return Observable.just(1, 2, 3, 4, 4, 4, 4, 4, 2);
}
现在我们创建观察者来检查两个连续的小于 4 的项目。
Observable<Integer> source = getSource();
source.scan(0, (integer, integer2) -> {
if (integer2 < 4) {
return integer + 1;
}
return 0;
}).filter(integer -> integer >= 2)
.subscribe(integer -> logd("two items in a row, smaller than 4."));
而接收事件的订阅者为五个(为了简化)连续值等于 4。
source.scan(0, (integer, integer2) -> {
if (integer2 == 4) {
return integer + 1;
}
return 0;
}).filter(integer -> integer >= 5) // for you it will be ten
.subscribe(integer -> logd("five items in a row equals to 4."));
现在您只需拨打:
observable.connect();
注意:当条件匹配时,当前实现不会重置计数器。这意味着,例如,如果您正在寻找一个包含五个连续值且等于 4 的序列,则此序列:
4 4 4 4 4 4
将生成两个匹配项,因为第一个匹配项是序列的前五个项目,但是当您收到一个值为 4 的新项目时,您仍然有一个匹配项。
另一种避免使用两个不同订阅的方法是使用 merge
,这样:
enum MatchResult{
TWO_ITEM_SMALL,
FIVE_CONSECUTIVE
}
Observable<MatchResult> getObservable1(Observable<Integer> observable) {
return observable
.scan(0, (integer, integer2) -> {
if (integer2 < 4) {
return integer + 1;
}
return 0;
}).filter(integer -> integer >= 2)
.map(integer -> MatchResult.TWO_ITEM_SMALL);
}
Observable<MatchResult> getObservable2(Observable<Integer> observable) {
return observable
.scan(0, (integer, integer2) -> {
if (integer2 == 4) {
return integer + 1;
} else {
return 0;
}
}).filter(integer -> integer >= 5)
.map(integer -> MatchResult.FIVE_CONSECUTIVE);
}
Observable.merge(getObservable1(getSoruce()), getObservable2(getSoruce()))
.subscribe(matchResult -> {
if(matchResult == MatchResult.TWO_ITEM_SMALL) {
logd("two items in a row, smaller than 4.");
} else {
logd("five items in a row equals to 4");
}
});