使用 replay(selectorFoo) 但不使用 publish(selectorFoo) 时 OOM

OOM when using replay(selectorFoo) but not publish(selectorFoo)

这会因 OOM 而崩溃:

Flowable.range(1, 5000)
        .map(__ -> new byte[1024 * 1024])
        .replay(
          fb ->
            fb.take(1)
              .concatMap(__ -> fb)
          ,1
        )
        .count()
        .toFlowable()
        .blockingSubscribe(c -> System.out.println("args = [" + c + "]"));

我认为这是因为 replay 坚持来自上游的排放,尽管我认为 1 缓冲区大小提示不会...什么我是不是不见了?

这不会崩溃:

Flowable.range(1, 5000)
        .map(__ -> new byte[1024 * 1024])
        .publish(
          fb ->
            fb.take(1)
              .concatMap(first -> fb.startWith(first))
          ,1
        )
        .count()
        .toFlowable()
        .blockingSubscribe(c -> System.out.println("args = [" + c + "]"));

但我不确定我是否能保证我会像那样从上游获得所有排放...

我对此进行了调查并找到了问题的原因:RxJava 2 replay 中的错误。

发生的事情是 replay 持有对 2 个订阅者的引用,一个用于 take,另一个用于本地变量中 concatMap 的内部消费者,因此有一个从主线程到失效 take 的 GC root 仍然引用第一个项目。由于有界重播使用链表,因此第一个项目通过其 "next" 链接不断引用更新的项目并最终耗尽内存。

publish 不保留对旧值的引用,因此这不是问题。