flatMapIterable 不会使用 Maybe.error() 恢复发射项目

flatMapIterable does not resume emitting items with Maybe.error()

我有一个要求,我必须在单击按钮时发送已保存的 API 请求。这些 API 请求被添加到一个列表中,如果设备处于离线状态,该列表将保存到 SharedPreferences。一旦设备重新连接,保存的请求应该通过点击按钮发送。如果其中一个请求获得 HTTP 状态代码 401,则整个过程应该停止。但是,如果出现其他异常,则不应中断该过程,并且应发送列表中保存的下一个请求。如果请求成功,它将从保存的请求列表中删除。在流程结束时,所有未发送的请求都将保存到 SharedPreferences。

现在我有一个异常的特例,我称之为 InvalidRequestException。当遇到这个特定错误时,我想从列表中删除请求,同时我想继续发送列表中剩余的请求。

我根据 post 建模我的代码。这是启动整个过程的方法的代码:

public LiveData<UploadStatus> startUploading() {
MutableLiveData<UploadStatus> uploadStatus = new MutableLiveData<>();

compositeDisposable.add(paramRepository.getSavedOfflineRequest()    // returns Observable<List<Request>>
    .doOnComplete(() -> uploadStatus.setValue(UploadStatus.NO_ITEMS))
    .flatMapIterable( requests -> {
        requestList = requests;
        requestListSizeText.set(Integer.toString(requestList.size()));
        return requestList;
    })                                                              // observable should now be Observable<Request>         
    .flatMapCompletable(this::uploadProcess)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(() ->{
            paramRepository.setOfflineRequestString("");            // clear saved offline requests from shared preferences
            uploadStatus.setValue(UploadStatus.SUCCESS);
        },
        error -> {
            if (error instanceof SessionExpiredException) {
                uploadStatus.setValue(UploadStatus.LOGGED_OUT);
            } else {
                if(!requestList.isEmpty()) {
                    paramRepository.saveRequestsToPrefs(requestList);
                } else { 
                    paramRepository.deleteSavedRequests();
                }
                uploadStatus.setValue(UploadStatus.FAIL);
            }
        }
    )
);

return uploadStatus;

}

已保存请求的实际发送发生在 uploadProcess。这是我尝试捕获 InvalidRequestException 并删除遇到它的请求的地方:

private Completable uploadProcess(Request request) {

    return apiService.transact(saleUrl, BuildConfig.ApiKey,request)
        .doOnSubscribe(disposable -> {
            uploadAttempts++;
        })
        .toMaybe()
        .onErrorResumeNext(error -> {
            if(error instanceof InvalidRequestException) {
                requestList.remove(request);

                if(requestList.isEmpty()) {
                    return Maybe.error(new OfflineTxnsNotUploadedException());
                }
            }
            else if (error instanceof SessionExpiredException)                // inform UI that session has expired
                return Maybe.error(error);
            else if (requestList.size() == uploadAttempts)  {                 // nothing was uploaded
                return Maybe.error(new OfflineTxnsNotUploadedException());
            }
            return Maybe.empty();
        })
        .flatMapCompletable(response -> {
            requestList.remove(request);
            successCount++;
            successCountText.set(Integer.toString(successCount));
            return createTransaction(request, response);
        });
}

现在,当我对此进行测试时,我发现每当遇到 InvalidRequestException 时整个流都会停止,这不是我想要的行为。我想继续发送列表中的其他请求。我实际上删除了从列表中删除请求的部分(requestList.remove(request);),并且流继续并且下一个请求是通过 apiService.transact().

发送的

我是否错误地假设返回 Maybe.empty() 会恢复从 flatMapIterable 发射 Observable<Request>

编辑:我似乎遇到了 ConcurrentModificationException,这就是流立即终止并且其他请求未发送的原因。我将不得不先研究这个例外。

正如我在编辑中指出的那样,我无法捕捉到 ConcurrentModificationException,因此整个流被中断了。事实上,我正在修改正在发射到个人 Observable<Request> 中的 List<Request>,因为 requestList 只是 getSavedOfflineRequest 发射的 List<Request> 的浅拷贝。

我尝试的一个解决方案是首先通过 Moshi serialize the list,这样反序列化的列表将不会包含对原始列表的任何引用。我很快发现 requestList.remove(request) 将 return 为假,因为我还没有正确覆盖 Request [=27= 的 equals()hashCode() 方法].最后,我只是决定创建一个 "failed requests" 列表,每当遇到错误时将 Requests 添加到此列表中。