RxAndroid:了解单个操作何时完成以及所有操作何时完成

RxAndroid: Know When Individual Operations Complete as well as When All of them Completes

我是 RxJava 的新手,我已经设法通过 Retrofit 实现 RxJava 以使用 flatMap 同时下载多个文件。我已成功收到单个下载的 onComplete 状态。但是我无法在收到 所有 下载的完成状态时实现功能。

下面是我用于多次下载的代码:

private void downloadFile(String url, final File parentFolder) {
    final Uri uri = Uri.parse(url);
    Retrofit retrofit = new Retrofit.Builder()
            .baseUrl(url.replace(uri.getLastPathSegment(),""))
            .client(new OkHttpClient.Builder().build())
            .addCallAdapterFactory(RxJavaCallAdapterFactory.create()).build();
    RestApi restApi = retrofit.create(RestApi.class);
    restApi.downloadFile(url)
            .flatMap(new Func1<Response<ResponseBody>, Observable<File>>() {
                @Override
                public Observable<File> call(final Response<ResponseBody> responseBodyResponse) {
                    return Observable.fromCallable(new Callable<File>() {
                        @Override
                        public File call() throws Exception {
                            File file = new File(parentFolder.getAbsoluteFile(), uri.getLastPathSegment());

                            if (!file.exists()) {
                                file.createNewFile();

                                BufferedSink sink = Okio.buffer(Okio.sink(file));
                                sink.writeAll(responseBodyResponse.body().source());
                                sink.close();
                            }
                            return file;
                        }
                    });
                }
            },3)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<File>() {

                @Override
                public void onCompleted() {
                    dManager.logMe(TAG,"file download complete");
                }

                @Override
                public void onError(Throwable e) {
                    dManager.logMe(TAG,"file download error");
                    e.printStackTrace();
                }

                @Override
                public void onNext(File file) {
                    dManager.logMe(TAG,"file download onNext: " + file.getAbsolutePath());
                }
            });
}

无论何时收到 URL 信息,我都会在多个 for 循环中调用 downloadFile(String url, final File parentFolder。 提前致谢。

假设您有 List<String> 个网址:

    Observable.from(urls)
            .flatMap(url -> restApi.downloadFile(url))
            .concatMap(new Func1<Response<ResponseBody>, Observable<File>>() {
                @Override
                public Observable<File> call(final Response<ResponseBody> responseBodyResponse) {
                    try {
                        File file = new File(parentFolder.getAbsoluteFile(), uri.getLastPathSegment());
                        if (!file.exists()) {
                            file.createNewFile();
                        }
                        BufferedSink sink = Okio.buffer(Okio.sink(file));
                        sink.writeAll(responseBodyResponse.body().source());
                        sink.close();
                        return Observable.just(file);
                    } catch (Exception e) {
                        return Observable.error(e);
                    }
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<File>() {

                @Override
                public void onCompleted() {
                    dManager.logMe(TAG, "file download complete");
                }

                @Override
                public void onError(Throwable e) {
                    dManager.logMe(TAG, "file download error");
                    e.printStackTrace();
                }

                @Override
                public void onNext(File file) {
                    dManager.logMe(TAG, "file download onNext: " + file.getAbsolutePath());
                }
            });

这是一个草稿示例

Observable.from(List <Url>)
                .flatMap(Observable.fromCallable(downloadImageHereAndReturnTheFile())
                        .observeOn(AndroidSchedulers.mainThread())
                        // Do on next is where you get notified when each url is getting downloaded
                        .doOnNext(doTheUIOperationHere())
                        .observeOn(Schedulers.io()) //Switch to background thread if you want, in case if you do some other background task otherwise remove the above this and the following next operator 
                        .doOnNext(DoSomeOtherOperationHere()))
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeOn(Schedulers.io())
                .subscribe(new Subscriber<R>() {
                    @Override
                    public void onCompleted() {
                        Log.d(TAG, "The entire process of downloading all the image completed here");
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(R r) {
                        Log.d(TAG, "You receive the file for each url");
                    }
                });