Couchbase error: rx.exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value

Couchbase error: rx.exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value

我有一个 Spring-boot 应用程序可以将数据保存到 Couchbase 服务器。

我喜欢使用此代码节省大约 50 万学生:

for (String student: students) {
  ...
  bucket.upsert(JsonDocument.create(<unique key here>, TTL, JsonObject.fromJson(studentAsJson)));                
}

这适用于少量学生(最多 100K +-)。

但是每当应用程序达到大约 110K-140K 学生时,我都会收到此错误:

2018-12-23 16:20:57.804 ERROR 17468 --- [nio-8989-exec-6] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is com.couchbase.client.java.error.TemporaryFailureException] with root cause

rx.exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value: com.couchbase.client.core.message.kv.UpsertResponse.class
at rx.exceptions.OnErrorThrowable.addValueAsLastCause(OnErrorThrowable.java:118) ~[rxjava-1.3.8.jar:1.3.8]
at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:73) ~[rxjava-1.3.8.jar:1.3.8]
at rx.observers.Subscribers.onNext(Subscribers.java:235) ~[rxjava-1.3.8.jar:1.3.8]
at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onNext(OnSubscribeDoOnEach.java:101) ~[rxjava-1.3.8.jar:1.3.8]
at rx.internal.producers.SingleProducer.request(SingleProducer.java:65) ~[rxjava-1.3.8.jar:1.3.8]
at rx.internal.producers.ProducerArbiter.setProducer(ProducerArbiter.java:126) ~[rxjava-1.3.8.jar:1.3.8]
at rx.internal.operators.OnSubscribeTimeoutTimedWithFallback$TimeoutMainSubscriber.setProducer(OnSubscribeTimeoutTimedWithFallback.java:155) ~[rxjava-1.3.8.jar:1.3.8]
at rx.Subscriber.setProducer(Subscriber.java:205) ~[rxjava-1.3.8.jar:1.3.8]
at rx.internal.operators.OnSubscribeMap$MapSubscriber.setProducer(OnSubscribeMap.java:102) ~[rxjava-1.3.8.jar:1.3.8]
at rx.Subscriber.setProducer(Subscriber.java:205) ~[rxjava-1.3.8.jar:1.3.8]
at rx.Subscriber.setProducer(Subscriber.java:205) ~[rxjava-1.3.8.jar:1.3.8]
at rx.subjects.AsyncSubject.onCompleted(AsyncSubject.java:103) ~[rxjava-1.3.8.jar:1.3.8]
at com.couchbase.client.core.endpoint.AbstractGenericHandler.completeResponse(AbstractGenericHandler.java:508) ~[core-io-1.7.1.jar:na]
at com.couchbase.client.core.endpoint.AbstractGenericHandler.access[=11=]0(AbstractGenericHandler.java:86) ~[core-io-1.7.1.jar:na]
at com.couchbase.client.core.endpoint.AbstractGenericHandler.call(AbstractGenericHandler.java:526) ~[core-io-1.7.1.jar:na]
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55) ~[rxjava-1.3.8.jar:1.3.8]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_161]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_161]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180) ~[na:1.8.0_161]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[na:1.8.0_161]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_161]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_161]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161]

我的 Couchbase 是本地主机上 Docker 中的 运行,我想知道这是真正的问题还是只是由于 docker 和本地主机的限制。

从未使用过 CouchBase,但似乎是背压问题 (https://github.com/ReactiveX/RxJava/wiki/Backpressure-(2.0))。

所以基本上您的消费者(在本例中为 CouchBase)无法像您提供给它的那样快地使用数据。从日志中可以看出你正在使用 RxJava1。我建议升级到 RxJava2。由于更改,这将需要一些时间,但您将可以访问提供开箱即用的 BackPressure 的 Flowable(http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html)。

通过设置带有背压的 Flowable,您将能够解决慢速消费者的问题,因为它允许您等待消费者消费数据,然后将新数据发送给它。

如果您无法更新 RxJava 的版本,David Karnok 的这篇文章值得一看,因为它展示了无需背压即可处理此链的方法:https://github.com/ReactiveX/RxJava/wiki/Backpressure

希望对您有所帮助。

所以在代码到达 "regular" 环境后,使用不在容器内运行的真实数据库,问题就消失了。