Android改造并行文件下载问题
Android Retrofit parallel file download issue
我的应用程序中有一个用于从服务器下载两个 zip 文件的用例。为此,我一直在使用 retrofit+rxjava(创建了两个单独的改造服务)。对于并行执行,我一直在新线程中订阅改造服务,然后使用 zip 运算符将其组合。它工作正常。但是后来我在两个服务中都添加了 map 运算符以进行解压缩操作,但它不执行在 map 运算符中编写的代码,并且控件直接传递给 zip 操作。我不知道如何解决这个问题,我是反应世界的新手。
到目前为止我尝试了什么
Observable<Response<ResponseBody>> dFileObservable = dbDownloadApi.downloadDealerData(WebServiceConstants.ACTION_DEALER_DATA,
params.getDealerNumber(),params.getUserId(),params.getClientId(), params.getSessionId()).subscribeOn(Schedulers.newThread());
dFileObservable.map(new Function<Response<ResponseBody>, String>() {
@Override
public String apply(Response<ResponseBody> responseBody) throws Exception {
String header = responseBody.headers().get("Content-Disposition");
String filename = header.replace("attachment; filename=", "");
String downloadFolderPath = fileManager.makeAndGetDownloadFolderPath();
String dealerZipPath = fileManager.makeFolder(downloadFolderPath, StrConstants.DEALER_FOLDER_NAME);
fileManager.writeDownloadedFileToDisk(dealerZipPath,filename, responseBody.body().source());
String dealerFilePath = dealerZipPath+File.separator+filename;
unzipUtility.unzip(dealerFilePath, fileManager.makeAndGetDownloadFolderPath()+File.separator+ StrConstants.GENERAL_FOLDER_NAME);
return dealerFilePath;
}
});
Observable<Response<ResponseBody>> generalFileObservable = dbDownloadApi.downloadGeneralData(WebServiceConstants.ACTION_GENERAL_DATA,
params.getDealerNumber(),params.getUserId(),params.getClientId(), params.getSessionId()).subscribeOn(Schedulers.newThread());;
generalFileObservable.map(new Function<Response<ResponseBody>, String>() {
@Override
public String apply(Response<ResponseBody> responseBody) throws Exception {
String header = responseBody.headers().get("Content-Disposition");
String filename = header.replace("attachment; filename=", "");
String downloadFolderPath = fileManager.makeAndGetDownloadFolderPath();
String generalZipPath = fileManager.makeFolder(downloadFolderPath, StrConstants.GENERAL_FOLDER_NAME);
fileManager.writeDownloadedFileToDisk(generalZipPath,filename, responseBody.body().source());
String generalFilePath = generalZipPath+File.separator+filename;
unzipUtility.unzip(generalFilePath, fileManager.makeAndGetDownloadFolderPath()+File.separator+ StrConstants.GENERAL_FOLDER_NAME);
return generalFilePath;
}
});
Observable<String> zipped = Observable.zip(dealerFileObservable, generalFileObservable, new BiFunction<Response<ResponseBody>, Response<ResponseBody>, String>() {
@Override
public String apply(Response<ResponseBody> responseBodyResponse, Response<ResponseBody> responseBodyResponse2) throws Exception {
System.out.println("zipped yess");
return null;
}
}).observeOn(Schedulers.io());
zipped.subscribe(getObserver());
和 getObserver() 函数
private Observer<String> getObserver(){
return new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String value) {
System.out.println("------------total time-----------");
System.out.println("result value-->"+value);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
}
当代码执行时,控制被转移到 zip 运算符中的 apply() 函数,并且两个 observable 中的 map 运算符都没有被执行。
还有一个问题
我是merging/zipping这两个observable,传递给operator的类型是Response<"ResponseBody">。实际上我需要那里下载的文件路径(字符串类型),为此我应该怎么做?
**
Updated the Solution as described by @Yaroslav Stavnichiy and now its working
**
Observable<String> deObservable = dbDownloadApi.downloaddData(WebServiceConstants.ACTION_DATA,
params.getNumber(),params.getId(),params.getCtId(), params.getSessionId())
.flatMap(new Function<Response<ResponseBody>, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Response<ResponseBody> responseBody) throws Exception {
String zipPath = fileManager.processDownloadedFile(StrConstants.FOLDER_NAME,
StrConstants.FILE_NAME,responseBody.body().source());
return Observable.just(zipPath);
}
}).map(new Function<String, String>() {
@Override
public String apply(String filePath) throws Exception {
String unzipDestinationPath = fileManager.makeAndGetDownloadFolderPath()+
File.separator+ StrConstants.FOLDER_NAME;
unzipUtility.unzip(filePath, unzipDestinationPath);
return unzipDestinationPath;
}
}).subscribeOn(Schedulers.newThread());
您有效地做的是:
Observable a = ...;
Observable b = ...;
a.map(...);
b.map(...);
Observable.zip(a, b).subscribe(f);
map()
(以及所有其他 rx 运算符)不改变源。它 returns 可用于进一步计算的新可观察对象。在您的代码中,您忽略了那些返回的对象。您压缩的是原始可观察对象,而不是映射的可观察对象,这就是映射器函数未被调用的原因。
我认为您想执行以下操作:
Observable a = ... .map(...);
Observable b = ... .map(...);
Observable.zip(a, b).subscribe(f);
我的应用程序中有一个用于从服务器下载两个 zip 文件的用例。为此,我一直在使用 retrofit+rxjava(创建了两个单独的改造服务)。对于并行执行,我一直在新线程中订阅改造服务,然后使用 zip 运算符将其组合。它工作正常。但是后来我在两个服务中都添加了 map 运算符以进行解压缩操作,但它不执行在 map 运算符中编写的代码,并且控件直接传递给 zip 操作。我不知道如何解决这个问题,我是反应世界的新手。
到目前为止我尝试了什么
Observable<Response<ResponseBody>> dFileObservable = dbDownloadApi.downloadDealerData(WebServiceConstants.ACTION_DEALER_DATA,
params.getDealerNumber(),params.getUserId(),params.getClientId(), params.getSessionId()).subscribeOn(Schedulers.newThread());
dFileObservable.map(new Function<Response<ResponseBody>, String>() {
@Override
public String apply(Response<ResponseBody> responseBody) throws Exception {
String header = responseBody.headers().get("Content-Disposition");
String filename = header.replace("attachment; filename=", "");
String downloadFolderPath = fileManager.makeAndGetDownloadFolderPath();
String dealerZipPath = fileManager.makeFolder(downloadFolderPath, StrConstants.DEALER_FOLDER_NAME);
fileManager.writeDownloadedFileToDisk(dealerZipPath,filename, responseBody.body().source());
String dealerFilePath = dealerZipPath+File.separator+filename;
unzipUtility.unzip(dealerFilePath, fileManager.makeAndGetDownloadFolderPath()+File.separator+ StrConstants.GENERAL_FOLDER_NAME);
return dealerFilePath;
}
});
Observable<Response<ResponseBody>> generalFileObservable = dbDownloadApi.downloadGeneralData(WebServiceConstants.ACTION_GENERAL_DATA,
params.getDealerNumber(),params.getUserId(),params.getClientId(), params.getSessionId()).subscribeOn(Schedulers.newThread());;
generalFileObservable.map(new Function<Response<ResponseBody>, String>() {
@Override
public String apply(Response<ResponseBody> responseBody) throws Exception {
String header = responseBody.headers().get("Content-Disposition");
String filename = header.replace("attachment; filename=", "");
String downloadFolderPath = fileManager.makeAndGetDownloadFolderPath();
String generalZipPath = fileManager.makeFolder(downloadFolderPath, StrConstants.GENERAL_FOLDER_NAME);
fileManager.writeDownloadedFileToDisk(generalZipPath,filename, responseBody.body().source());
String generalFilePath = generalZipPath+File.separator+filename;
unzipUtility.unzip(generalFilePath, fileManager.makeAndGetDownloadFolderPath()+File.separator+ StrConstants.GENERAL_FOLDER_NAME);
return generalFilePath;
}
});
Observable<String> zipped = Observable.zip(dealerFileObservable, generalFileObservable, new BiFunction<Response<ResponseBody>, Response<ResponseBody>, String>() {
@Override
public String apply(Response<ResponseBody> responseBodyResponse, Response<ResponseBody> responseBodyResponse2) throws Exception {
System.out.println("zipped yess");
return null;
}
}).observeOn(Schedulers.io());
zipped.subscribe(getObserver());
和 getObserver() 函数
private Observer<String> getObserver(){
return new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String value) {
System.out.println("------------total time-----------");
System.out.println("result value-->"+value);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
}
当代码执行时,控制被转移到 zip 运算符中的 apply() 函数,并且两个 observable 中的 map 运算符都没有被执行。
还有一个问题
我是merging/zipping这两个observable,传递给operator的类型是Response<"ResponseBody">。实际上我需要那里下载的文件路径(字符串类型),为此我应该怎么做?
**
Updated the Solution as described by @Yaroslav Stavnichiy and now its working
**
Observable<String> deObservable = dbDownloadApi.downloaddData(WebServiceConstants.ACTION_DATA,
params.getNumber(),params.getId(),params.getCtId(), params.getSessionId())
.flatMap(new Function<Response<ResponseBody>, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Response<ResponseBody> responseBody) throws Exception {
String zipPath = fileManager.processDownloadedFile(StrConstants.FOLDER_NAME,
StrConstants.FILE_NAME,responseBody.body().source());
return Observable.just(zipPath);
}
}).map(new Function<String, String>() {
@Override
public String apply(String filePath) throws Exception {
String unzipDestinationPath = fileManager.makeAndGetDownloadFolderPath()+
File.separator+ StrConstants.FOLDER_NAME;
unzipUtility.unzip(filePath, unzipDestinationPath);
return unzipDestinationPath;
}
}).subscribeOn(Schedulers.newThread());
您有效地做的是:
Observable a = ...;
Observable b = ...;
a.map(...);
b.map(...);
Observable.zip(a, b).subscribe(f);
map()
(以及所有其他 rx 运算符)不改变源。它 returns 可用于进一步计算的新可观察对象。在您的代码中,您忽略了那些返回的对象。您压缩的是原始可观察对象,而不是映射的可观察对象,这就是映射器函数未被调用的原因。
我认为您想执行以下操作:
Observable a = ... .map(...);
Observable b = ... .map(...);
Observable.zip(a, b).subscribe(f);