flatMapIterable 不会使用 Maybe.error() 恢复发射项目
flatMapIterable does not resume emitting items with Maybe.error()
我有一个要求,我必须在单击按钮时发送已保存的 API 请求。这些 API 请求被添加到一个列表中,如果设备处于离线状态,该列表将保存到 SharedPreferences
。一旦设备重新连接,保存的请求应该通过点击按钮发送。如果其中一个请求获得 HTTP 状态代码 401,则整个过程应该停止。但是,如果出现其他异常,则不应中断该过程,并且应发送列表中保存的下一个请求。如果请求成功,它将从保存的请求列表中删除。在流程结束时,所有未发送的请求都将保存到 SharedPreferences。
现在我有一个异常的特例,我称之为 InvalidRequestException
。当遇到这个特定错误时,我想从列表中删除请求,同时我想继续发送列表中剩余的请求。
我根据 post 建模我的代码。这是启动整个过程的方法的代码:
public LiveData<UploadStatus> startUploading() {
MutableLiveData<UploadStatus> uploadStatus = new MutableLiveData<>();
compositeDisposable.add(paramRepository.getSavedOfflineRequest() // returns Observable<List<Request>>
.doOnComplete(() -> uploadStatus.setValue(UploadStatus.NO_ITEMS))
.flatMapIterable( requests -> {
requestList = requests;
requestListSizeText.set(Integer.toString(requestList.size()));
return requestList;
}) // observable should now be Observable<Request>
.flatMapCompletable(this::uploadProcess)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(() ->{
paramRepository.setOfflineRequestString(""); // clear saved offline requests from shared preferences
uploadStatus.setValue(UploadStatus.SUCCESS);
},
error -> {
if (error instanceof SessionExpiredException) {
uploadStatus.setValue(UploadStatus.LOGGED_OUT);
} else {
if(!requestList.isEmpty()) {
paramRepository.saveRequestsToPrefs(requestList);
} else {
paramRepository.deleteSavedRequests();
}
uploadStatus.setValue(UploadStatus.FAIL);
}
}
)
);
return uploadStatus;
}
已保存请求的实际发送发生在 uploadProcess
。这是我尝试捕获 InvalidRequestException
并删除遇到它的请求的地方:
private Completable uploadProcess(Request request) {
return apiService.transact(saleUrl, BuildConfig.ApiKey,request)
.doOnSubscribe(disposable -> {
uploadAttempts++;
})
.toMaybe()
.onErrorResumeNext(error -> {
if(error instanceof InvalidRequestException) {
requestList.remove(request);
if(requestList.isEmpty()) {
return Maybe.error(new OfflineTxnsNotUploadedException());
}
}
else if (error instanceof SessionExpiredException) // inform UI that session has expired
return Maybe.error(error);
else if (requestList.size() == uploadAttempts) { // nothing was uploaded
return Maybe.error(new OfflineTxnsNotUploadedException());
}
return Maybe.empty();
})
.flatMapCompletable(response -> {
requestList.remove(request);
successCount++;
successCountText.set(Integer.toString(successCount));
return createTransaction(request, response);
});
}
现在,当我对此进行测试时,我发现每当遇到 InvalidRequestException
时整个流都会停止,这不是我想要的行为。我想继续发送列表中的其他请求。我实际上删除了从列表中删除请求的部分(requestList.remove(request);
),并且流继续并且下一个请求是通过 apiService.transact()
.
发送的
我是否错误地假设返回 Maybe.empty()
会恢复从 flatMapIterable
发射 Observable<Request>
?
编辑:我似乎遇到了 ConcurrentModificationException
,这就是流立即终止并且其他请求未发送的原因。我将不得不先研究这个例外。
正如我在编辑中指出的那样,我无法捕捉到 ConcurrentModificationException
,因此整个流被中断了。事实上,我正在修改正在发射到个人 Observable<Request>
中的 List<Request>
,因为 requestList
只是 getSavedOfflineRequest
发射的 List<Request>
的浅拷贝。
我尝试的一个解决方案是首先通过 Moshi serialize the list,这样反序列化的列表将不会包含对原始列表的任何引用。我很快发现 requestList.remove(request)
将 return 为假,因为我还没有正确覆盖 Request
[=27= 的 equals()
和 hashCode()
方法].最后,我只是决定创建一个 "failed requests" 列表,每当遇到错误时将 Requests
添加到此列表中。
我有一个要求,我必须在单击按钮时发送已保存的 API 请求。这些 API 请求被添加到一个列表中,如果设备处于离线状态,该列表将保存到 SharedPreferences
。一旦设备重新连接,保存的请求应该通过点击按钮发送。如果其中一个请求获得 HTTP 状态代码 401,则整个过程应该停止。但是,如果出现其他异常,则不应中断该过程,并且应发送列表中保存的下一个请求。如果请求成功,它将从保存的请求列表中删除。在流程结束时,所有未发送的请求都将保存到 SharedPreferences。
现在我有一个异常的特例,我称之为 InvalidRequestException
。当遇到这个特定错误时,我想从列表中删除请求,同时我想继续发送列表中剩余的请求。
我根据
public LiveData<UploadStatus> startUploading() {
MutableLiveData<UploadStatus> uploadStatus = new MutableLiveData<>();
compositeDisposable.add(paramRepository.getSavedOfflineRequest() // returns Observable<List<Request>>
.doOnComplete(() -> uploadStatus.setValue(UploadStatus.NO_ITEMS))
.flatMapIterable( requests -> {
requestList = requests;
requestListSizeText.set(Integer.toString(requestList.size()));
return requestList;
}) // observable should now be Observable<Request>
.flatMapCompletable(this::uploadProcess)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(() ->{
paramRepository.setOfflineRequestString(""); // clear saved offline requests from shared preferences
uploadStatus.setValue(UploadStatus.SUCCESS);
},
error -> {
if (error instanceof SessionExpiredException) {
uploadStatus.setValue(UploadStatus.LOGGED_OUT);
} else {
if(!requestList.isEmpty()) {
paramRepository.saveRequestsToPrefs(requestList);
} else {
paramRepository.deleteSavedRequests();
}
uploadStatus.setValue(UploadStatus.FAIL);
}
}
)
);
return uploadStatus;
}
已保存请求的实际发送发生在 uploadProcess
。这是我尝试捕获 InvalidRequestException
并删除遇到它的请求的地方:
private Completable uploadProcess(Request request) {
return apiService.transact(saleUrl, BuildConfig.ApiKey,request)
.doOnSubscribe(disposable -> {
uploadAttempts++;
})
.toMaybe()
.onErrorResumeNext(error -> {
if(error instanceof InvalidRequestException) {
requestList.remove(request);
if(requestList.isEmpty()) {
return Maybe.error(new OfflineTxnsNotUploadedException());
}
}
else if (error instanceof SessionExpiredException) // inform UI that session has expired
return Maybe.error(error);
else if (requestList.size() == uploadAttempts) { // nothing was uploaded
return Maybe.error(new OfflineTxnsNotUploadedException());
}
return Maybe.empty();
})
.flatMapCompletable(response -> {
requestList.remove(request);
successCount++;
successCountText.set(Integer.toString(successCount));
return createTransaction(request, response);
});
}
现在,当我对此进行测试时,我发现每当遇到 InvalidRequestException
时整个流都会停止,这不是我想要的行为。我想继续发送列表中的其他请求。我实际上删除了从列表中删除请求的部分(requestList.remove(request);
),并且流继续并且下一个请求是通过 apiService.transact()
.
我是否错误地假设返回 Maybe.empty()
会恢复从 flatMapIterable
发射 Observable<Request>
?
编辑:我似乎遇到了 ConcurrentModificationException
,这就是流立即终止并且其他请求未发送的原因。我将不得不先研究这个例外。
正如我在编辑中指出的那样,我无法捕捉到 ConcurrentModificationException
,因此整个流被中断了。事实上,我正在修改正在发射到个人 Observable<Request>
中的 List<Request>
,因为 requestList
只是 getSavedOfflineRequest
发射的 List<Request>
的浅拷贝。
我尝试的一个解决方案是首先通过 Moshi serialize the list,这样反序列化的列表将不会包含对原始列表的任何引用。我很快发现 requestList.remove(request)
将 return 为假,因为我还没有正确覆盖 Request
[=27= 的 equals()
和 hashCode()
方法].最后,我只是决定创建一个 "failed requests" 列表,每当遇到错误时将 Requests
添加到此列表中。