RxAndroid 过滤器 Observable<List<Item>>

RxAndroid filter Observable<List<Item>>

我使用 RxAndroid,我想过滤 Observable<List<Item>>.

的结果

这是代码:

Observable<List<Item>> observable = 
Observable.create((Observable.OnSubscribe<List<Item>>) subscriber -> {
        subscriber.onNext(ItemManager.getItems());
        subscriber.onCompleted();
    });

我想使用 .filter() 只获取有效项目。这样做就完美了:

observable.filter(new Func1<Item, Boolean>() {
    @Override
        public Boolean call(Item item) {
            return item.isValid();
    }
});

但是,.filter() 迫使我实施 new Func1() :

observable.filter(new Func1<List<Item>, Boolean>() {
        @Override
            public Boolean call(Item item) {
        }
    });

有人知道如何解决这个问题吗?

提前谢谢你:)

我找到了这个解决方案,但我不知道这是否是解决问题的好方法:

Observable<List<Item>> observable = 
Observable.create((Observable.OnSubscribe<Item>) subscriber -> {
        for (Item item : ItemManager.getItems()) {
            subscriber.onNext(item);
        }

        subscriber.onCompleted();
    })
    .filter(item -> item.isValid());
     Observable.defer(() -> Observable.just(ItemManager.getItems())
            .subscribeOn(Schedulers.io())
            .flatMapIterable(items -> items)
            .filter(item -> item.isValid())
            .toList();

只需使用 from 函数创建 Observable:

    Observable<Item> observable = Observable.from(ItemManager.getItems())
    .filter(new Func1<Item, Boolean>() {
        @Override
        public Boolean call(Item item) {
            return item.isValid();
        }
    });

    observable.subscribe(System.out::println);

更新:

由于您需要 ItemManager.getItems() 被异步调用,因此您需要执行以下操作:

Observable<List<Item>> observable = 
Observable.create((Observable.OnSubscribe<Item>) subscriber -> {
        for (Item item : ItemManager.getItems()) {
            subscriber.onNext(item);
        }

        subscriber.onCompleted();
    })
.filter(item -> item.isValid())
.subscribeOn(Schedulers.io());

这样,订阅和对 ItemManager.getItems() 的调用将在 io 线程上执行。