RxJava 2:如何过滤发出 "List<Item>" 的无限流的项目?

RxJava 2: How to filter items of infinite stream that emits "List<Item>"?

我有一个 Observable 永远不会完成。它发出 List<Item>。每次发出该列表时,我都需要过滤掉其中的一些项目。目前我有这个作为解决方案:

mData.getItemsObservable() // Observable<List<Item>>
        .compose(...)
        .flatMapSingle(items -> Observable.fromIterable(items)
                .filter(item -> item.someCondition())
                .toList())
        .subscribe(items -> {
            // ...
        }, error -> {
            // ...
        });

这是过滤掉某些项目的最佳方式吗?有没有更简单(更易读)的方法来做同样的事情?

我也试过这个,但它没有发出任何东西:

mData.getItemsObservable() // Observable<List<Item>>
        .compose(...)
        .flatMap(Observable::fromIterable) // or like this: flatMapIterable(items -> items)
        .filter(item -> item.someCondition())
        .toList()
        .subscribe(items -> {
            // ...
        }, error -> {
            // ...
        });

如果您想坚持 RxJava,第一种方法就可以了。否则,您可以使用 IxJava 并直接在 map 操作中执行过滤:

mData.getItemsObservable() // Observable<List<Item>>
    .compose(...)
    .map(v -> Ix.from(v).filter(w -> w.someCondition()).toList())
    .subscribe(items -> {
        // ...
    }, error -> {
        // ...
    });