RxJava 过滤并发出其他项目

RxJava filter and emit other items

是否可以像下面那样过滤并继续发射 itens?


我调用订阅者 2 次的代码:

Observable<Map.Entry<String, ArrayList<MockOverview>>> requestEntries =
        this.requestView.request(request)
        .map(HashMap::entrySet)
        .flatMapIterable(entries -> entries);

requestEntries.filter(entry -> entry.getKey().equals("featured"))
        .map((Func1<Map.Entry<String, ArrayList<MockOverview>>, List<MockOverview>>) Map.Entry::getValue)
        .subscribe(mockOverviews -> {
            Log.i("subscrive", "featured");
        });

requestEntries.filter(entry -> entry.getKey().equals("done"))
        .map((Func1<Map.Entry<String, ArrayList<MockOverview>>, List<MockOverview>>) Map.Entry::getValue)
        .subscribe(mockOverviews -> {
            Log.i("subscrive", "featured");
        });

我想要的:

 requestEntries.filter(entry -> entry.getKey().equals("featured"))
        .map((Func1<Map.Entry<String, ArrayList<MockOverview>>, List<MockOverview>>) Map.Entry::getValue)
        .subscribe(mockOverviews -> {

        })
        .filter(entry -> entry.getKey().equals("done"))
        .map((Func1<Map.Entry<String, ArrayList<MockOverview>>, List<MockOverview>>) Map.Entry::getValue)
        .subscribe(mockOverviews -> {

        });

可以用doOnNext代替第一个subscribe()

 requestEntry.filter(v -> ...)
 .map(v -> ...)
 .doOnNext(v -> ...)
 .filter(v -> ...)
 .map(v -> ...)
 .subscribe(...)

或使用publish(Func1):

 requestEntry.filter(v -> ...)
 .map(v -> ...)
 .publish(o -> {
     o.subscribe(...);
     return o;
 })
 .filter(v -> ...)
 .map(v -> ...)
 .subscribe(...)

从外观上看,您的第二个版本与第一个版本不同:前者查看 requestEntries 流两次,过滤 featureddone 键分别用它做自己的事情。然而,您的第二个版本首先过滤 featured,然后进行一些转换和副作用,然后过滤掉 done。但是,Observable<entryset> 根本不在第二个过滤器 lambda 的范围内。

你在这里需要做的是在 requestEntries 上使用 publish(<lambda>) 并在 lambda 中做你第一个版本的东西,使用 onNext 而不是 subscribemerge 流和 return 组合流。然后在 publish 之外你订阅一次(并且什么都不做)或者继续在其他地方使用你的流的结果。

requestEntries.publish(re -> {
    Observable<...> x = re.filter(...<featured>...).map(...).doOnNext(...Log.i(...));
    Observable<...> y = re.filter(...<done>...).map(...).doOnNext(...Log.i(...));
    return x.mergeWith(y);
})