RxJava rx.exceptions.MissingBackpressureException 带过滤器和地图

RxJava rx.exceptions.MissingBackpressureException with filter and map


我刚开始玩 RxJava/RxAndroid 并且在理解如何正确处理背压方面遇到了一些问题。

我有一个可观察到的文件扫描器,可以扫描目录并发出文件。应尽快处理这些文件,不要跳过任何文件。

所以管道看起来像这样: Observable<File> -> Filter<File, Boolean> {check if file is of type .xyz}

不幸的是,我收到 rx.exceptions.MissingBackpressureException 错误。所以我读到了背压,如果我理解正确的话,无损失选项只是缓冲区和 windows.

我试过了onBackpressureBuffer(), buffer() and window()。虽然所有 onBackpressureX() 命令似乎都没有效果,但 buffer() 将项目分组为 List<File>。我的问题是:

  1. 我应该如何过滤这些组? filter(<List<File>>, Boolean) 没有意义...
  2. 我如何在我的文件扫描器中实现可观察到的背压处理,以便它等到我的管道/操作员/订阅者有容量?
  3. 用例如map() 放入 XYZ 实体并将它们存储在单独的列表中而不是活跃的订阅者但作为运营商的副作用?

一些反馈甚至提示会有很大帮助,我们将不胜感激。

我想我找到了解决问题的方法: 此代码无效:

Observable<File> task = scanner.getProcessDirectoryTask(mountPoint);
Subscription _subscription = task
       .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .onBackpressureBuffer(10000)
        .filter(new Func1<File, Boolean>() {
            @Override
            public Boolean call(File file) {
                return file.getAbsolutePath().endsWith("xyz");
            }
        })
        .buffer(100)
        .subscribe(new Observer<List<File>>() { /*whatever you want to do*/ }

然而这段代码正在工作:

Observable<File> task = scanner.getProcessDirectoryTask(mountPoint);
Subscription _subscription = task
        .onBackpressureBuffer(10000)
        .filter(new Func1<File, Boolean>() {
            @Override
            public Boolean call(File file) {
                return file.getAbsolutePath().endsWith("xyz");
            }
        })
        .buffer(100)
       .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<List<File>>() { /*whatever you want to do*/ }

看来 subscribeOn()observeOn() 的顺序差别很大!

我的第三个问题有点跑题,但仍然悬而未决。也许有人可以对此发表评论。