基于内容的 RxJava Observable 缓冲区
RxJava Observable buffer based on content
我开始了一个使用 vertX 和 RxJava 的项目,但我遇到了一个找不到解决方案的问题。
我有一个 Observable,它为传入的通信发出 WebSocketFrame,
每个 WebSocketFrame 都由一个有效负载(一个 ByteBuffer)和指示它是消息的第一帧还是最后一帧的标志组成。
我想对这个 Observable 进行操作,将其转换为发出 ByteBufferd 的 Observable,其中包含每条消息的所有帧。
我尝试了 buffer
方法,但它似乎旨在根据任意标准(时间或其他可观察的)重新组合项目。
另一种方法似乎是使用 compose
订阅 WebSocketFrame 可观察对象,在非结束帧上添加到缓冲区,并在结束帧上添加到 "feed" ByteBuffer Observable。但我不知道如何手动创建和提供缓冲区。
因此,如果有人已经看到这个问题(恕我直言,这似乎很常见)并且对 RxJava 有足够的知识来提出实现,我将不胜感激。
感谢您的阅读。
我想您必须使用 buffer
operator for this (maybe you could do with the simpler buffer
, but I'm not sure about that). See also this other question that covers roughly the same topic and this GitHub page 进行更多讨论。希望对您有所帮助!
我开始了一个使用 vertX 和 RxJava 的项目,但我遇到了一个找不到解决方案的问题。
我有一个 Observable,它为传入的通信发出 WebSocketFrame, 每个 WebSocketFrame 都由一个有效负载(一个 ByteBuffer)和指示它是消息的第一帧还是最后一帧的标志组成。
我想对这个 Observable 进行操作,将其转换为发出 ByteBufferd 的 Observable,其中包含每条消息的所有帧。
我尝试了 buffer
方法,但它似乎旨在根据任意标准(时间或其他可观察的)重新组合项目。
另一种方法似乎是使用 compose
订阅 WebSocketFrame 可观察对象,在非结束帧上添加到缓冲区,并在结束帧上添加到 "feed" ByteBuffer Observable。但我不知道如何手动创建和提供缓冲区。
因此,如果有人已经看到这个问题(恕我直言,这似乎很常见)并且对 RxJava 有足够的知识来提出实现,我将不胜感激。
感谢您的阅读。
我想您必须使用 buffer
operator for this (maybe you could do with the simpler buffer
, but I'm not sure about that). See also this other question that covers roughly the same topic and this GitHub page 进行更多讨论。希望对您有所帮助!