基于内容的 RxJava Observable 缓冲区

RxJava Observable buffer based on content

我开始了一个使用 vertX 和 RxJava 的项目,但我遇到了一个找不到解决方案的问题。

我有一个 Observable,它为传入的通信发出 WebSocketFrame, 每个 WebSocketFrame 都由一个有效负载(一个 ByteBuffer)和指示它是消息的第一帧还是最后一帧的标志组成。

我想对这个 Observable 进行操作,将其转换为发出 ByteBufferd 的 Observable,其中包含每条消息的所有帧。

我尝试了 buffer 方法,但它似乎旨在根据任意标准(时间或其他可观察的)重新组合项目。

另一种方法似乎是使用 compose 订阅 WebSocketFrame 可观察对象,在非结束帧上添加到缓冲区,并在结束帧上添加到 "feed" ByteBuffer Observable。但我不知道如何手动创建和提供缓冲区。

因此,如果有人已经看到这个问题(恕我直言,这似乎很常见)并且对 RxJava 有足够的知识来提出实现,我将不胜感激。

感谢您的阅读。

我想您必须使用 buffer operator for this (maybe you could do with the simpler buffer, but I'm not sure about that). See also this other question that covers roughly the same topic and this GitHub page 进行更多讨论。希望对您有所帮助!