使用 Rx 读取文件 Java

Read Files using Rx Java

我有一个文件夹中的许多文件,我正在读取这些文件并将其内容存储到数据库中。我怎样才能使用 RX Java 执行相同的操作?因为我有很多文件要读取并且在 Async Task

中花费了很多时间
String path = "some path";
        File directory = new File(path);
        File[] files = directory.listFiles();
        Log.d(TAG, "Size: " + files.length);
        for (int i = 0; i < files.length; i++) {
            Log.d(TAG, "FileName -> " + files[i].getName());

         //execute async to read file
        }

您可以非常轻松地用 RxJava 中的多线程替换单线程方法(与不使用 RxJava 相比)。 AsyncTask 使用 executeOnExecutor(Executor exec, Params... params) 提供并行执行,因此您当前的实现可以针对 AsyncTask 进行调整以处理单个文件而不是完整的目录。但是你的问题是关于 RxJava 的。

多线程将加快文件读取速度 - 速度提升将取决于很多因素,但主要因素是您的闪存顺序读取/随机读取速度。鉴于这会很冷 Observable 他们使用 Flowable 真的没有意义,因为背压不会在这里发挥作用。

实现某种多线程的伪代码:

    Observable.just(directory)
            .flatMap(dir ->
                    Observable.fromArray(dir.listFiles()))
            .filter(File::isFile)
            .flatMap(file ->
                    Observable.just(file)
                            .map(this::readFileFunctionWithReturn)
                            .subscribeOn(Schedulers.io())) // execute on multiple threads
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread()) // observe on UI, or choose another Scheduler
            .subscribe(fileResult -> {
                // unordered results
            });

如果您想对线程进行更细粒度的控制,也可以创建由 Executor 支持的自定义 Scheduler 实现,即自定义 ThreadFactory.