RxJava2 Flowable:将对象逐个发送到服务器并检测结束
RxJava2 Flowable : send objects to server on by one and detect end
我需要将对象列表发送到我的远程服务器。由于它们可能很多而且很大,所以我使用可流动的使用请求(1)从数组列表中一个一个地发送它们。
对于每个对象,都会对服务器进行改造调用,在 return 中,我获取远程 ID,并使用远程 ID 更新本地对象。
我需要检测此任务的结束:即发送的最后一个对象的最后响应,以防止对同一对象的多个并发调用。
目前一切正常,但我在从远程服务器收到答案之前收到 "completed" 消息,因此在对象更新之前。
我该怎么做?
Flowable<Integer> observable = Flowable.range(0, objList.size());
observable.subscribe(new DefaultSubscriber<Integer>() {
@Override
public void onStart() {
Log.d(TAG, "on start");
request(1);
}
@Override
public void onNext(Integer t) {
Log.d(TAG, "on next : " + t);
MyObj = objList.get(t);
RetrofitHelper.createService(ObjService.class, true, authType, authToken).createOrUpdateObj(objList.get(t)).flatMap(p -> {
Log.d(TAG, "recu p");
if (p != null) {
try {
p.setSyncho(true);
// save remote id on obj
ObjDB.updateObj(p);
request(1);
return Observable.empty();
} catch (Throwable th) {
ExceptionHandler.logException(th);
return Observable.error(th);
}
} else {
request(1);
return Observable.empty(); // provisoirement si pb on renvoie vide
}
})
.onErrorResumeNext(r -> {
request(1);
Observable.empty();
})
.onExceptionResumeNext(error -> Observable.empty()) // go to next on error
.subscribeOn(Schedulers.io()).onErrorReturn(error -> {
Log.d("ERROR", error.getMessage());
return 0;
})
.onErrorResumeNext(Observable.empty()).subscribe();
}
@Override
public void onError(Throwable t) {
Log.e("XXX ERROR ", "" + t);
request(1);
patientSynchroInProgress = Boolean.FALSE;
}
@Override
public void onComplete() {
Log.e("XXX COMPLETE", "complete");
}
});
您应该将改造调用移到 map(...)
运算符中:
Flowable<Integer> observable = Flowable.range(0, objList.size());
observable
.map(t -> {
MyObj = objList.get(t);
return RetrofitHelper.createService(ObjService.class, true, authType, authToken).createOrUpdateObj(objList.get(t)).flatMap(p -> {
Log.d(TAG, "recu p");
if (p != null) {
try {
p.setSyncho(true);
// save remote id on obj
ObjDB.updateObj(p);
return Observable.empty();
} catch (Throwable th) {
ExceptionHandler.logException(th);
return Observable.error(th);
}
} else {
return Observable.empty(); // provisoirement si pb on renvoie vide
}
})
.onErrorResumeNext(r -> {
Observable.empty();
})
.onExceptionResumeNext(error -> Observable.empty()) // go to next on error
.subscribeOn(Schedulers.io()).onErrorReturn(error -> {
Log.d("ERROR", error.getMessage());
return 0;
})
.onErrorResumeNext(Observable.empty())
})
.subscribe(new DefaultSubscriber<Integer>() {
@Override
public void onStart() {
Log.d(TAG, "on start");
}
@Override
public void onNext(Integer t) {
Log.d(TAG, "on next : " + t);
}
@Override
public void onError(Throwable t) {
Log.e("XXX ERROR ", "" + t);
patientSynchroInProgress = Boolean.FALSE;
}
@Override
public void onComplete() {
Log.e("XXX COMPLETE", "complete");
}
});
您正在 onNext(...)
中执行改造调用,因此您的网络响应可能不是连续的。通过使用 map(...)
运算符转换您的可观察对象,每个发射都将成为一个单独的网络调用。这允许您的 onNext(...)
函数打印改造调用的顺序结果,并允许您的 onComplete()
在所有后续调用完成时执行。
从 onNext
或 map
中调用 subscribe
通常是错误的做法,表明您应该在其中使用 flatMap
或 concatMap
上游。在这种情况下,可以使用 concatMap
,因为它只会 运行 一个内部源,即你的改造调用,并且只有在这个完成后才执行下一个。
Flowable.fromIterable(objList)
.concatMap(item ->
RetrofitHelper.createService(ObjService.class, true, authType, authToken)
.createOrUpdateObj(item)
.flatMap(p -> {
if (p != null) {
try {
p.setSyncho(true);
// save remote id on obj
ObjDB.updateObj(p);
return Observable.just(item);
} catch (Throwable th) {
ExceptionHandler.logException(th);
return Observable.<Integer>error(th);
}
} else {
return Observable.<Integer>empty(); // provisoirement si pb on renvoie vide
}
})
.onErrorResumeNext(Observable.<Integer>empty())
.toFlowable(BackpressureStrategy.BUFFER)
)
.subscribe(new DefaultSubscriber<Integer>() {
@Override
public void onStart() {
Log.d(TAG, "on start");
}
@Override
public void onNext(Integer t) {
Log.d(TAG, "on next : " + t);
}
@Override
public void onError(Throwable t) {
Log.e("XXX ERROR ", "" + t);
patientSynchroInProgress = Boolean.FALSE;
}
@Override
public void onComplete() {
Log.e("XXX COMPLETE", "complete");
}
});
我终于成功了
Flowable.fromIterable(patientList)
.concatMap(item -> {
item.setSomething();
return RetrofitHelper.createService(ObjService.class, true, authType, authToken)
.createOrUpdateObj(item)
.flatMap(p -> {
if (p != null) {
try {
p.setSyncho(true);
// save remote id on obj
ObjDB.updateObj(p);
return Observable.empty();
} catch (Throwable th) {
ExceptionHandler.logException(th);
return Observable.error(th);
}
} else {
return Observable.empty(); // provisoirement si pb on renvoie vide
}
})
.onErrorResumeNext(Observable.empty())
.toFlowable(BackpressureStrategy.BUFFER);
}
)
.doOnNext(s -> {
Log.d(TAG, ((Obj) s).toString());
})
.doOnComplete(() -> {
// do something when completed
Log.d(TAG, "COMPLETE");
})
.subscribe();
}
}
感谢您的帮助
我需要将对象列表发送到我的远程服务器。由于它们可能很多而且很大,所以我使用可流动的使用请求(1)从数组列表中一个一个地发送它们。
对于每个对象,都会对服务器进行改造调用,在 return 中,我获取远程 ID,并使用远程 ID 更新本地对象。
我需要检测此任务的结束:即发送的最后一个对象的最后响应,以防止对同一对象的多个并发调用。
目前一切正常,但我在从远程服务器收到答案之前收到 "completed" 消息,因此在对象更新之前。
我该怎么做?
Flowable<Integer> observable = Flowable.range(0, objList.size());
observable.subscribe(new DefaultSubscriber<Integer>() {
@Override
public void onStart() {
Log.d(TAG, "on start");
request(1);
}
@Override
public void onNext(Integer t) {
Log.d(TAG, "on next : " + t);
MyObj = objList.get(t);
RetrofitHelper.createService(ObjService.class, true, authType, authToken).createOrUpdateObj(objList.get(t)).flatMap(p -> {
Log.d(TAG, "recu p");
if (p != null) {
try {
p.setSyncho(true);
// save remote id on obj
ObjDB.updateObj(p);
request(1);
return Observable.empty();
} catch (Throwable th) {
ExceptionHandler.logException(th);
return Observable.error(th);
}
} else {
request(1);
return Observable.empty(); // provisoirement si pb on renvoie vide
}
})
.onErrorResumeNext(r -> {
request(1);
Observable.empty();
})
.onExceptionResumeNext(error -> Observable.empty()) // go to next on error
.subscribeOn(Schedulers.io()).onErrorReturn(error -> {
Log.d("ERROR", error.getMessage());
return 0;
})
.onErrorResumeNext(Observable.empty()).subscribe();
}
@Override
public void onError(Throwable t) {
Log.e("XXX ERROR ", "" + t);
request(1);
patientSynchroInProgress = Boolean.FALSE;
}
@Override
public void onComplete() {
Log.e("XXX COMPLETE", "complete");
}
});
您应该将改造调用移到 map(...)
运算符中:
Flowable<Integer> observable = Flowable.range(0, objList.size());
observable
.map(t -> {
MyObj = objList.get(t);
return RetrofitHelper.createService(ObjService.class, true, authType, authToken).createOrUpdateObj(objList.get(t)).flatMap(p -> {
Log.d(TAG, "recu p");
if (p != null) {
try {
p.setSyncho(true);
// save remote id on obj
ObjDB.updateObj(p);
return Observable.empty();
} catch (Throwable th) {
ExceptionHandler.logException(th);
return Observable.error(th);
}
} else {
return Observable.empty(); // provisoirement si pb on renvoie vide
}
})
.onErrorResumeNext(r -> {
Observable.empty();
})
.onExceptionResumeNext(error -> Observable.empty()) // go to next on error
.subscribeOn(Schedulers.io()).onErrorReturn(error -> {
Log.d("ERROR", error.getMessage());
return 0;
})
.onErrorResumeNext(Observable.empty())
})
.subscribe(new DefaultSubscriber<Integer>() {
@Override
public void onStart() {
Log.d(TAG, "on start");
}
@Override
public void onNext(Integer t) {
Log.d(TAG, "on next : " + t);
}
@Override
public void onError(Throwable t) {
Log.e("XXX ERROR ", "" + t);
patientSynchroInProgress = Boolean.FALSE;
}
@Override
public void onComplete() {
Log.e("XXX COMPLETE", "complete");
}
});
您正在 onNext(...)
中执行改造调用,因此您的网络响应可能不是连续的。通过使用 map(...)
运算符转换您的可观察对象,每个发射都将成为一个单独的网络调用。这允许您的 onNext(...)
函数打印改造调用的顺序结果,并允许您的 onComplete()
在所有后续调用完成时执行。
从 onNext
或 map
中调用 subscribe
通常是错误的做法,表明您应该在其中使用 flatMap
或 concatMap
上游。在这种情况下,可以使用 concatMap
,因为它只会 运行 一个内部源,即你的改造调用,并且只有在这个完成后才执行下一个。
Flowable.fromIterable(objList)
.concatMap(item ->
RetrofitHelper.createService(ObjService.class, true, authType, authToken)
.createOrUpdateObj(item)
.flatMap(p -> {
if (p != null) {
try {
p.setSyncho(true);
// save remote id on obj
ObjDB.updateObj(p);
return Observable.just(item);
} catch (Throwable th) {
ExceptionHandler.logException(th);
return Observable.<Integer>error(th);
}
} else {
return Observable.<Integer>empty(); // provisoirement si pb on renvoie vide
}
})
.onErrorResumeNext(Observable.<Integer>empty())
.toFlowable(BackpressureStrategy.BUFFER)
)
.subscribe(new DefaultSubscriber<Integer>() {
@Override
public void onStart() {
Log.d(TAG, "on start");
}
@Override
public void onNext(Integer t) {
Log.d(TAG, "on next : " + t);
}
@Override
public void onError(Throwable t) {
Log.e("XXX ERROR ", "" + t);
patientSynchroInProgress = Boolean.FALSE;
}
@Override
public void onComplete() {
Log.e("XXX COMPLETE", "complete");
}
});
我终于成功了
Flowable.fromIterable(patientList)
.concatMap(item -> {
item.setSomething();
return RetrofitHelper.createService(ObjService.class, true, authType, authToken)
.createOrUpdateObj(item)
.flatMap(p -> {
if (p != null) {
try {
p.setSyncho(true);
// save remote id on obj
ObjDB.updateObj(p);
return Observable.empty();
} catch (Throwable th) {
ExceptionHandler.logException(th);
return Observable.error(th);
}
} else {
return Observable.empty(); // provisoirement si pb on renvoie vide
}
})
.onErrorResumeNext(Observable.empty())
.toFlowable(BackpressureStrategy.BUFFER);
}
)
.doOnNext(s -> {
Log.d(TAG, ((Obj) s).toString());
})
.doOnComplete(() -> {
// do something when completed
Log.d(TAG, "COMPLETE");
})
.subscribe();
}
}
感谢您的帮助