将异步调用转换为RxJava,处理完所有项目后如何手动完成?
Converting an async call to RxJava, how to manually complete after all items are processed?
我有一个自定义 class 用于将单个文件或多个文件上传到 Firebase 存储:
public class FirebaseUploader {
private Context appContext;
private FirebaseStorage storage;
public FirebaseUploader(Context appContext) {
this.appContext = appContext;
this.storage = FirebaseStorage.getInstance();
}
// Uploading a single file
public Observable<String> send(Uri file) {
return toObservable(Observable.just(file));
}
// Uploading multiple files
public Observable<String> send(List<Uri> files) {
return toObservable(Observable.fromIterable(files));
}
private Observable<String> toObservable(Observable<Uri> observable) {
return observable.flatMap(this::upload);
}
private Observable<String> upload(Uri file) {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
StorageReference branch = storage.getReference().child("images");
StorageReference leaf = branch.child(generateUniqueId(file));
UploadTask uploadTask = leaf.putFile(file);
uploadTask.continueWithTask(new Continuation<UploadTask.TaskSnapshot, Task<Uri>>() {
@Override
public Task<Uri> then(@NonNull Task<UploadTask.TaskSnapshot> task) throws Exception {
if (!task.isSuccessful()) emitter.onError(task.getException());
return leaf.getDownloadUrl();
}
}).addOnCompleteListener(new OnCompleteListener<Uri>() {
@Override
public void onComplete(@NonNull Task<Uri> task) {
if (task.isSuccessful()) {
Uri downloadUri = task.getResult();
String url = downloadUri.toString();
emitter.onNext(url);
//emitter.onComplete();
} else {
// Handle failures
// ...
}
}
});
}
});
}
private String generateUniqueId(Uri file) {
String fileType = appContext.getContentResolver().getType(file);
String extension = (fileType.equals("image/jpeg")) ? ".jpg" : ".png";
String fileName = UUID.randomUUID().toString();
return fileName + extension;
}
}
实际上我花了几天时间才想出这个解决方案,因为我最初不知道如何将 Firebase 异步调用转换为 RxJava。然后我想知道为什么 observable 没有完成很长一段时间,直到我最终意识到我正在调用 emitter.onNext
和 emitter.onError
,但我的代码中没有任何地方 emitter.onComplete
!
所以 emitter.onComplete
在 OnCompleteListener
里面。但它被注释掉了——那不是那个调用应该出现的地方。我一直在寻找的是一种仅在处理完所有文件后才在 emitter
上调用 .onComplete()
的方法。由于我对 RxJava 的经验有限,我看到的唯一方法是首先计算要上传的文件数量,然后在达到该数量时手动完成序列。有没有更好的办法?
你的 emitter.onComplete()
正是它应该在的地方。
从您的 upload
方法返回的 Observable 只负责一件事:上传由 Uri 参数指定的单个文件。该任务在 UploadTask
完成时完成,因此您应该在那个时候调用 onComplete
。
flatMap
操作员负责将所有这些结果收集到一个 Observable
中。当所有文件都上传后,它将向下游发出 onComplete
,这正是您想要的。
我有一个自定义 class 用于将单个文件或多个文件上传到 Firebase 存储:
public class FirebaseUploader {
private Context appContext;
private FirebaseStorage storage;
public FirebaseUploader(Context appContext) {
this.appContext = appContext;
this.storage = FirebaseStorage.getInstance();
}
// Uploading a single file
public Observable<String> send(Uri file) {
return toObservable(Observable.just(file));
}
// Uploading multiple files
public Observable<String> send(List<Uri> files) {
return toObservable(Observable.fromIterable(files));
}
private Observable<String> toObservable(Observable<Uri> observable) {
return observable.flatMap(this::upload);
}
private Observable<String> upload(Uri file) {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
StorageReference branch = storage.getReference().child("images");
StorageReference leaf = branch.child(generateUniqueId(file));
UploadTask uploadTask = leaf.putFile(file);
uploadTask.continueWithTask(new Continuation<UploadTask.TaskSnapshot, Task<Uri>>() {
@Override
public Task<Uri> then(@NonNull Task<UploadTask.TaskSnapshot> task) throws Exception {
if (!task.isSuccessful()) emitter.onError(task.getException());
return leaf.getDownloadUrl();
}
}).addOnCompleteListener(new OnCompleteListener<Uri>() {
@Override
public void onComplete(@NonNull Task<Uri> task) {
if (task.isSuccessful()) {
Uri downloadUri = task.getResult();
String url = downloadUri.toString();
emitter.onNext(url);
//emitter.onComplete();
} else {
// Handle failures
// ...
}
}
});
}
});
}
private String generateUniqueId(Uri file) {
String fileType = appContext.getContentResolver().getType(file);
String extension = (fileType.equals("image/jpeg")) ? ".jpg" : ".png";
String fileName = UUID.randomUUID().toString();
return fileName + extension;
}
}
实际上我花了几天时间才想出这个解决方案,因为我最初不知道如何将 Firebase 异步调用转换为 RxJava。然后我想知道为什么 observable 没有完成很长一段时间,直到我最终意识到我正在调用 emitter.onNext
和 emitter.onError
,但我的代码中没有任何地方 emitter.onComplete
!
所以 emitter.onComplete
在 OnCompleteListener
里面。但它被注释掉了——那不是那个调用应该出现的地方。我一直在寻找的是一种仅在处理完所有文件后才在 emitter
上调用 .onComplete()
的方法。由于我对 RxJava 的经验有限,我看到的唯一方法是首先计算要上传的文件数量,然后在达到该数量时手动完成序列。有没有更好的办法?
你的 emitter.onComplete()
正是它应该在的地方。
从您的 upload
方法返回的 Observable 只负责一件事:上传由 Uri 参数指定的单个文件。该任务在 UploadTask
完成时完成,因此您应该在那个时候调用 onComplete
。
flatMap
操作员负责将所有这些结果收集到一个 Observable
中。当所有文件都上传后,它将向下游发出 onComplete
,这正是您想要的。