如何修复 "MissingBackpressureException"
How to fix "MissingBackpressureException"
我正在使用 RxJava2 Flowables,通过订阅来自 PublishSubject.It 在企业级应用程序中使用的事件流,我们无法选择删除任何事件。
我正在使用版本 RxJava 2.2.8
我正在使用 BackpressureStrategy.BUFFER,因为我不想失去任何活动。
此外,我再次缓冲 50000 或 3 分钟,以较早者为准。我这样做是因为我想合并事件然后处理它们。
但是我在 运行
的几分钟内收到以下错误
io.reactivex.exceptions.MissingBackpressureException: Could not emit buffer due to lack of requests
at io.reactivex.internal.subscribers.QueueDrainSubscriber.fastPathOrderedEmitMax(QueueDrainSubscriber.java:121)
at io.reactivex.internal.operators.flowable.FlowableBufferTimed$BufferExactBoundedSubscriber.run(FlowableBufferTimed.java:569)
at io.reactivex.Scheduler$Worker$PeriodicTask.run(Scheduler.java:479)
at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)
我尝试通过设置增加缓冲区大小,但行为没有任何变化。
System.setProperty("rx2.buffer-size", "524288");
此外,如果我缓冲了更长的时间而不是 3 分钟,我会在更长的时间后得到异常,这可能是因为当事件合并得更多时,我的下游性能更好。但是,我没有那个选择,因为这些是实时事件,需要立即处理(3-5 分钟内)。
我在调用 "subscription.next" 之前也尝试了 thread.sleep() 以防出错,但仍然得到相同的结果。
keySubject.hide()
.toFlowable(BackpressureStrategy.BUFFER)
.parallel()
.runOn(Schedulers.computation())
.map(e -> e.getContents())
.flatMap(s -> Flowable.fromIterable(s))
.sequential()
.buffer(3,TimeUnit.MINUTES,50000)
.subscribe(new Subscriber<List<String>>() {
@Override
public void onSubscribe(Subscription var1) {
innerSubscription = var1;
innerSubscription.request(1L);
}
@Override
public void onNext(List<String> logs) {
Subscription.request(1L);
/// Do some logic here
}
我想知道如何处理背压以避免此异常?这个异常是因为“.buffer”方法吗
有没有办法让我检查这些缓冲区的状态。还有为什么即使我增加 rx2.buffer-size,我仍然会在相同的时间内得到异常。理想情况下,如果异常是因为缓冲区已满,系统应该 运行 更长的时间和更大的缓冲区大小。
任何关于此消息的原因的帮助 "Could not emit buffer due to lack of requests at " 都将非常有用。
问题是,为什么您使用的主题不是 backpressure-aware?您是否将其用作穷人的活动巴士?另外,假设 e.getContents()
是一个简单的 getter 我相信你可以替换整个块
.toFlowable(BackpressureStrategy.BUFFER)
.parallel()
.runOn(Schedulers.computation())
.map(e -> e.getContents())
.flatMap(s -> Flowable.fromIterable(s))
.sequential()
.buffer(3,TimeUnit.MINUTES,50000)
.subscribe(new Subscriber<List<String>>() { ... });
与
.flatMapIterable(e -> e.getContents())
.buffer(3,TimeUnit.MINUTES,50000)
.rebatchRequests(1)
.observeOn(Schedulers.computation())
.doOnNext(s -> /* Do some logic here */)
.subscribe();
我正在使用 RxJava2 Flowables,通过订阅来自 PublishSubject.It 在企业级应用程序中使用的事件流,我们无法选择删除任何事件。 我正在使用版本 RxJava 2.2.8
我正在使用 BackpressureStrategy.BUFFER,因为我不想失去任何活动。
此外,我再次缓冲 50000 或 3 分钟,以较早者为准。我这样做是因为我想合并事件然后处理它们。
但是我在 运行
的几分钟内收到以下错误io.reactivex.exceptions.MissingBackpressureException: Could not emit buffer due to lack of requests
at io.reactivex.internal.subscribers.QueueDrainSubscriber.fastPathOrderedEmitMax(QueueDrainSubscriber.java:121)
at io.reactivex.internal.operators.flowable.FlowableBufferTimed$BufferExactBoundedSubscriber.run(FlowableBufferTimed.java:569)
at io.reactivex.Scheduler$Worker$PeriodicTask.run(Scheduler.java:479)
at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)
我尝试通过设置增加缓冲区大小,但行为没有任何变化。
System.setProperty("rx2.buffer-size", "524288");
此外,如果我缓冲了更长的时间而不是 3 分钟,我会在更长的时间后得到异常,这可能是因为当事件合并得更多时,我的下游性能更好。但是,我没有那个选择,因为这些是实时事件,需要立即处理(3-5 分钟内)。
我在调用 "subscription.next" 之前也尝试了 thread.sleep() 以防出错,但仍然得到相同的结果。
keySubject.hide()
.toFlowable(BackpressureStrategy.BUFFER)
.parallel()
.runOn(Schedulers.computation())
.map(e -> e.getContents())
.flatMap(s -> Flowable.fromIterable(s))
.sequential()
.buffer(3,TimeUnit.MINUTES,50000)
.subscribe(new Subscriber<List<String>>() {
@Override
public void onSubscribe(Subscription var1) {
innerSubscription = var1;
innerSubscription.request(1L);
}
@Override
public void onNext(List<String> logs) {
Subscription.request(1L);
/// Do some logic here
}
我想知道如何处理背压以避免此异常?这个异常是因为“.buffer”方法吗 有没有办法让我检查这些缓冲区的状态。还有为什么即使我增加 rx2.buffer-size,我仍然会在相同的时间内得到异常。理想情况下,如果异常是因为缓冲区已满,系统应该 运行 更长的时间和更大的缓冲区大小。
任何关于此消息的原因的帮助 "Could not emit buffer due to lack of requests at " 都将非常有用。
问题是,为什么您使用的主题不是 backpressure-aware?您是否将其用作穷人的活动巴士?另外,假设 e.getContents()
是一个简单的 getter 我相信你可以替换整个块
.toFlowable(BackpressureStrategy.BUFFER)
.parallel()
.runOn(Schedulers.computation())
.map(e -> e.getContents())
.flatMap(s -> Flowable.fromIterable(s))
.sequential()
.buffer(3,TimeUnit.MINUTES,50000)
.subscribe(new Subscriber<List<String>>() { ... });
与
.flatMapIterable(e -> e.getContents())
.buffer(3,TimeUnit.MINUTES,50000)
.rebatchRequests(1)
.observeOn(Schedulers.computation())
.doOnNext(s -> /* Do some logic here */)
.subscribe();