如何从阻塞队列创建反应器 Flux?
How can I create reactor Flux from a blocking queue?
我正在尝试实现从 BlockingQueue 创建的反应器 Flux,但不确定哪个运算符最适合我的用例?
我正在创建流式 REST 端点,其中响应是 Flux,它需要不断从 BlockingQueue 发出消息作为对 GET REST 调用的响应。
我已经尝试过论坛和文档,但只能找到从可迭代集合或反应性数据源启动的 Flux,但没有来自任何 BlockingQueue 的示例。
你可以试试Flux#generate and Queue#peek。请记住,如果队列为空,peek
将 return null
,并且不能在 onNext
.
中使用
类似于:
Flux.generate(sink -> {
val element = queue.peek();
if (element == null) {
sink.complete();
} else {
sink.next(element);
}
});
还有 Flux#repeatWhen 运算符,以防您想在队列被视为空后重新订阅队列,例如与:
flux.repeatWhen(it -> it.delayElements(ofSeconds(1)))
我正在尝试实现从 BlockingQueue 创建的反应器 Flux,但不确定哪个运算符最适合我的用例?
我正在创建流式 REST 端点,其中响应是 Flux,它需要不断从 BlockingQueue 发出消息作为对 GET REST 调用的响应。
我已经尝试过论坛和文档,但只能找到从可迭代集合或反应性数据源启动的 Flux,但没有来自任何 BlockingQueue 的示例。
你可以试试Flux#generate and Queue#peek。请记住,如果队列为空,peek
将 return null
,并且不能在 onNext
.
类似于:
Flux.generate(sink -> {
val element = queue.peek();
if (element == null) {
sink.complete();
} else {
sink.next(element);
}
});
还有 Flux#repeatWhen 运算符,以防您想在队列被视为空后重新订阅队列,例如与:
flux.repeatWhen(it -> it.delayElements(ofSeconds(1)))