RxJava2 Flowable:将对象逐个发送到服务器并检测结束

RxJava2 Flowable : send objects to server on by one and detect end

我需要将对象列表发送到我的远程服务器。由于它们可能很多而且很大,所以我使用可流动的使用请求(1)从数组列表中一个一个地发送它们。

对于每个对象,都会对服务器进行改造调用,在 return 中,我获取远程 ID,并使用远程 ID 更新本地对象。

我需要检测此任务的结束:即发送的最后一个对象的最后响应,以防止对同一对象的多个并发调用。

目前一切正常,但我在从远程服务器收到答案之前收到 "completed" 消息,因此在对象更新之前。

我该怎么做?

Flowable<Integer> observable = Flowable.range(0, objList.size());

        observable.subscribe(new DefaultSubscriber<Integer>() {
            @Override
            public void onStart() {
                Log.d(TAG, "on start");
                request(1);
            }

            @Override
            public void onNext(Integer t) {
                Log.d(TAG, "on next : " + t);
                MyObj = objList.get(t);

                RetrofitHelper.createService(ObjService.class, true, authType, authToken).createOrUpdateObj(objList.get(t)).flatMap(p -> {

                    Log.d(TAG, "recu p");

                    if (p != null) {
                        try {
                            p.setSyncho(true);
                            // save remote id on obj
                            ObjDB.updateObj(p);
                            request(1);

                            return Observable.empty();
                        } catch (Throwable th) {
                            ExceptionHandler.logException(th);
                            return Observable.error(th);
                        }
                    } else {
                        request(1);
                        return Observable.empty();  // provisoirement si pb on renvoie vide
                    }
                })
                        .onErrorResumeNext(r -> {
                            request(1);
                            Observable.empty();
                        })
                        .onExceptionResumeNext(error -> Observable.empty()) // go to next on error
                        .subscribeOn(Schedulers.io()).onErrorReturn(error -> {
                    Log.d("ERROR", error.getMessage());
                    return 0;
                })

                        .onErrorResumeNext(Observable.empty()).subscribe();
            }


            @Override
            public void onError(Throwable t) {
                Log.e("XXX ERROR ", "" + t);
                request(1);
                patientSynchroInProgress = Boolean.FALSE;
            }

            @Override
            public void onComplete() {
                Log.e("XXX COMPLETE", "complete");
            }
        });

您应该将改造调用移到 map(...) 运算符中:

Flowable<Integer> observable = Flowable.range(0, objList.size());

observable
     .map(t -> {
            MyObj = objList.get(t);

            return RetrofitHelper.createService(ObjService.class, true, authType, authToken).createOrUpdateObj(objList.get(t)).flatMap(p -> {

                    Log.d(TAG, "recu p");

                    if (p != null) {
                        try {
                            p.setSyncho(true);
                            // save remote id on obj
                            ObjDB.updateObj(p);

                            return Observable.empty();
                        } catch (Throwable th) {
                            ExceptionHandler.logException(th);
                            return Observable.error(th);
                        }
                    } else {
                        return Observable.empty();  // provisoirement si pb on renvoie vide
                    }
                })
                        .onErrorResumeNext(r -> {
                            Observable.empty();
                        })
                        .onExceptionResumeNext(error -> Observable.empty()) // go to next on error
                        .subscribeOn(Schedulers.io()).onErrorReturn(error -> {
                    Log.d("ERROR", error.getMessage());
                    return 0;
                })

                        .onErrorResumeNext(Observable.empty())
      })
     .subscribe(new DefaultSubscriber<Integer>() {
            @Override
            public void onStart() {
                Log.d(TAG, "on start");
            }

            @Override
            public void onNext(Integer t) {
                Log.d(TAG, "on next : " + t);
            }


            @Override
            public void onError(Throwable t) {
                Log.e("XXX ERROR ", "" + t);
                patientSynchroInProgress = Boolean.FALSE;
            }

            @Override
            public void onComplete() {
                Log.e("XXX COMPLETE", "complete");
            }
        });

您正在 onNext(...) 中执行改造调用,因此您的网络响应可能不是连续的。通过使用 map(...) 运算符转换您的可观察对象,每个发射都将成为一个单独的网络调用。这允许您的 onNext(...) 函数打印改造调用的顺序结果,并允许您的 onComplete() 在所有后续调用完成时执行。

onNextmap 中调用 subscribe 通常是错误的做法,表明您应该在其中使用 flatMapconcatMap上游。在这种情况下,可以使用 concatMap,因为它只会 运行 一个内部源,即你的改造调用,并且只有在这个完成后才执行下一个。

Flowable.fromIterable(objList)
.concatMap(item ->
     RetrofitHelper.createService(ObjService.class, true, authType, authToken)
     .createOrUpdateObj(item)
     .flatMap(p -> {
         if (p != null) {
             try {
                 p.setSyncho(true);
                 // save remote id on obj
                 ObjDB.updateObj(p);

                 return Observable.just(item);
             } catch (Throwable th) {
                 ExceptionHandler.logException(th);
                 return Observable.<Integer>error(th);
             }
         } else {
             return Observable.<Integer>empty();  // provisoirement si pb on renvoie vide
         }
     })
     .onErrorResumeNext(Observable.<Integer>empty())
     .toFlowable(BackpressureStrategy.BUFFER)
)
.subscribe(new DefaultSubscriber<Integer>() {
     @Override
     public void onStart() {
            Log.d(TAG, "on start");
     }

     @Override
     public void onNext(Integer t) {
         Log.d(TAG, "on next : " + t);
     }


     @Override
     public void onError(Throwable t) {
        Log.e("XXX ERROR ", "" + t);
        patientSynchroInProgress = Boolean.FALSE;
     }

     @Override
     public void onComplete() {
         Log.e("XXX COMPLETE", "complete");
     }
});

我终于成功了

  Flowable.fromIterable(patientList)
                    .concatMap(item -> {

                                item.setSomething();
                                return RetrofitHelper.createService(ObjService.class, true, authType, authToken)
                                        .createOrUpdateObj(item)
                                        .flatMap(p -> {
                                            if (p != null) {
                                                try {
                                                    p.setSyncho(true);
                                                    // save remote id on obj
                                                    ObjDB.updateObj(p);
                                                    return Observable.empty();
                                                } catch (Throwable th) {
                                                    ExceptionHandler.logException(th);
                                                    return Observable.error(th);
                                                }
                                            } else {
                                                return Observable.empty();  // provisoirement si pb on renvoie vide
                                            }
                                        })

                                        .onErrorResumeNext(Observable.empty())
                                        .toFlowable(BackpressureStrategy.BUFFER);
                            }
                    )
                    .doOnNext(s -> {
                        Log.d(TAG, ((Obj) s).toString());

                    })
                    .doOnComplete(() -> {
                        // do something when completed
                        Log.d(TAG, "COMPLETE");
                    })
                    .subscribe();
        }
    }

感谢您的帮助