使用 replay(selectorFoo) 但不使用 publish(selectorFoo) 时 OOM
OOM when using replay(selectorFoo) but not publish(selectorFoo)
这会因 OOM 而崩溃:
Flowable.range(1, 5000)
.map(__ -> new byte[1024 * 1024])
.replay(
fb ->
fb.take(1)
.concatMap(__ -> fb)
,1
)
.count()
.toFlowable()
.blockingSubscribe(c -> System.out.println("args = [" + c + "]"));
我认为这是因为 replay
坚持来自上游的排放,尽管我认为 1
缓冲区大小提示不会...什么我是不是不见了?
这不会崩溃:
Flowable.range(1, 5000)
.map(__ -> new byte[1024 * 1024])
.publish(
fb ->
fb.take(1)
.concatMap(first -> fb.startWith(first))
,1
)
.count()
.toFlowable()
.blockingSubscribe(c -> System.out.println("args = [" + c + "]"));
但我不确定我是否能保证我会像那样从上游获得所有排放...
我对此进行了调查并找到了问题的原因:RxJava 2 replay
中的错误。
发生的事情是 replay
持有对 2 个订阅者的引用,一个用于 take
,另一个用于本地变量中 concatMap
的内部消费者,因此有一个从主线程到失效 take
的 GC root 仍然引用第一个项目。由于有界重播使用链表,因此第一个项目通过其 "next" 链接不断引用更新的项目并最终耗尽内存。
publish
不保留对旧值的引用,因此这不是问题。
这会因 OOM 而崩溃:
Flowable.range(1, 5000)
.map(__ -> new byte[1024 * 1024])
.replay(
fb ->
fb.take(1)
.concatMap(__ -> fb)
,1
)
.count()
.toFlowable()
.blockingSubscribe(c -> System.out.println("args = [" + c + "]"));
我认为这是因为 replay
坚持来自上游的排放,尽管我认为 1
缓冲区大小提示不会...什么我是不是不见了?
这不会崩溃:
Flowable.range(1, 5000)
.map(__ -> new byte[1024 * 1024])
.publish(
fb ->
fb.take(1)
.concatMap(first -> fb.startWith(first))
,1
)
.count()
.toFlowable()
.blockingSubscribe(c -> System.out.println("args = [" + c + "]"));
但我不确定我是否能保证我会像那样从上游获得所有排放...
我对此进行了调查并找到了问题的原因:RxJava 2 replay
中的错误。
发生的事情是 replay
持有对 2 个订阅者的引用,一个用于 take
,另一个用于本地变量中 concatMap
的内部消费者,因此有一个从主线程到失效 take
的 GC root 仍然引用第一个项目。由于有界重播使用链表,因此第一个项目通过其 "next" 链接不断引用更新的项目并最终耗尽内存。
publish
不保留对旧值的引用,因此这不是问题。