使用异步 for 循环调用 RxJava 依赖方法
RxJava dependend method calls with async for loop
我想用 RxJava 解决多个依赖调用:
- 获取文件尚未上传的所有实体
- 上传文件到服务器
- 更新 收到的实体 url
- 上传实体到服务器
我尝试了不同的方法,但无法奏效。我不知道如何等到文件上传完成。这是我当前的代码:
Observable.fromIterable(repository.getFileNotUploaded()) // Returns a list of all entities that should be uploaded to the server
.flatMap(entity ->
restService.uploadFile(new File(directory.getPath(), entity.getLocalPath()))
.subscribe(fileUrl -> {
entity.setFileUrl(fileUrl);
repository.update(entity);
}));
// TODO: Wait until all files have been uploaded and the entities have been stored
// locally. Then upload the list of all entities.
休息电话:
public Observable<String> uploadFile(File file) {
return Observable.create(emitter -> {
AsyncTask.execute(new Runnable() {
@Override
public void run() {
commClient.sendFileRequest(URL, file,
response -> {
// ...
if(success){
emitter.onNext(new String(response.data));
}
emitter.onComplete();
}
}
});
});
}
我还读到在平面地图中调用订阅是一种反模式。如何级联我的方法调用?我应该使用范围方法和 return 我迭代的当前位置吗?
编辑 - 工作解决方案感谢 Emanuel S
Emanuel S 的解决方案有效。我还不得不改变我的休息服务。此服务必须 return 一个 Observable<Entity>
而不是 Observable<String>
。另请注意,不应混淆 AsyncTask 和 Rx。
public Observable<Entity> uploadFile(File file, Entity entity) {
//...
entity.setFileUrl(fileUrl);
emitter.onNext(entity);
//...
如果 repository.getFileNotUploaded() 是一个 ArrayList/Collection 你应该用 just 创建你的 Observable 并在之后迭代。
这可能有效(未经测试,在没有 IDE 的情况下编写)并批量上传所有实体。
正如 akarnokd 所写,你不需要使用 just
和 flatmapIterable
,因为你可以只使用 fromIterable
。
Observable.just(repository.getFileNotUploaded()).flatMapIterable ( files -> file)
// OR
Observable.flatmapIterable(repository.getFileNotUploaded())
.flatMap(entity -> rs.uploadFile(new File(yourPath, entity.getLocalPath())) // for each file not uploaded run another observable and return it
.map(entity -> { entity.setFieldUrl(fileUrl); return entity; }) // modify the entity and return it
.toList() // get it back to List<String> with your entities
.flatMap(entityList -> uploadEntityObservable(entityList))
.subscribe(
success -> Log.d("Success", "All entities and files uploaded"),
error -> Log.e("Error", "error happened somewhere")
);
如果您想通过一次调用上传每个修改后的实体,您可能需要替换
.toList() // get it back to List<String> with your entities
.flatMap(entityList -> uploadEntityObservable(entityList ))
与
.flatMapIterable( singleEntity -> uploadSingleEntity(singleEntity))
并且不要将 asyntask 与 RxJava 混用。如果你有 RXJava,你就不需要 AsyncTask。
注意:如果您流式传输数据并且您的存储库 Observable 发出数据,您需要使用
repository.getFileNotUploaded() // Observable<Whatever> emit data in a stream.
.flatMapIterable ( files -> file) ...
我想用 RxJava 解决多个依赖调用:
- 获取文件尚未上传的所有实体
- 上传文件到服务器
- 更新 收到的实体 url
- 上传实体到服务器
我尝试了不同的方法,但无法奏效。我不知道如何等到文件上传完成。这是我当前的代码:
Observable.fromIterable(repository.getFileNotUploaded()) // Returns a list of all entities that should be uploaded to the server
.flatMap(entity ->
restService.uploadFile(new File(directory.getPath(), entity.getLocalPath()))
.subscribe(fileUrl -> {
entity.setFileUrl(fileUrl);
repository.update(entity);
}));
// TODO: Wait until all files have been uploaded and the entities have been stored
// locally. Then upload the list of all entities.
休息电话:
public Observable<String> uploadFile(File file) {
return Observable.create(emitter -> {
AsyncTask.execute(new Runnable() {
@Override
public void run() {
commClient.sendFileRequest(URL, file,
response -> {
// ...
if(success){
emitter.onNext(new String(response.data));
}
emitter.onComplete();
}
}
});
});
}
我还读到在平面地图中调用订阅是一种反模式。如何级联我的方法调用?我应该使用范围方法和 return 我迭代的当前位置吗?
编辑 - 工作解决方案感谢 Emanuel S
Emanuel S 的解决方案有效。我还不得不改变我的休息服务。此服务必须 return 一个 Observable<Entity>
而不是 Observable<String>
。另请注意,不应混淆 AsyncTask 和 Rx。
public Observable<Entity> uploadFile(File file, Entity entity) {
//...
entity.setFileUrl(fileUrl);
emitter.onNext(entity);
//...
如果 repository.getFileNotUploaded() 是一个 ArrayList/Collection 你应该用 just 创建你的 Observable 并在之后迭代。
这可能有效(未经测试,在没有 IDE 的情况下编写)并批量上传所有实体。
正如 akarnokd 所写,你不需要使用 just
和 flatmapIterable
,因为你可以只使用 fromIterable
。
Observable.just(repository.getFileNotUploaded()).flatMapIterable ( files -> file)
// OR
Observable.flatmapIterable(repository.getFileNotUploaded())
.flatMap(entity -> rs.uploadFile(new File(yourPath, entity.getLocalPath())) // for each file not uploaded run another observable and return it
.map(entity -> { entity.setFieldUrl(fileUrl); return entity; }) // modify the entity and return it
.toList() // get it back to List<String> with your entities
.flatMap(entityList -> uploadEntityObservable(entityList))
.subscribe(
success -> Log.d("Success", "All entities and files uploaded"),
error -> Log.e("Error", "error happened somewhere")
);
如果您想通过一次调用上传每个修改后的实体,您可能需要替换
.toList() // get it back to List<String> with your entities
.flatMap(entityList -> uploadEntityObservable(entityList ))
与
.flatMapIterable( singleEntity -> uploadSingleEntity(singleEntity))
并且不要将 asyntask 与 RxJava 混用。如果你有 RXJava,你就不需要 AsyncTask。
注意:如果您流式传输数据并且您的存储库 Observable 发出数据,您需要使用
repository.getFileNotUploaded() // Observable<Whatever> emit data in a stream.
.flatMapIterable ( files -> file) ...