如何从阻塞队列创建反应器 Flux?

How can I create reactor Flux from a blocking queue?

我正在尝试实现从 BlockingQueue 创建的反应器 Flux,但不确定哪个运算符最适合我的用例?

我正在创建流式 REST 端点,其中响应是 Flux,它需要不断从 BlockingQueue 发出消息作为对 GET REST 调用的响应。

我已经尝试过论坛和文档,但只能找到从可迭代集合或反应性数据源启动的 Flux,但没有来自任何 BlockingQueue 的示例。

你可以试试Flux#generate and Queue#peek。请记住,如果队列为空,peek 将 return null,并且不能在 onNext.

中使用

类似于:

Flux.generate(sink -> {
    val element = queue.peek();
    if (element == null) {
        sink.complete();
    } else {
        sink.next(element);
    }
});

还有 Flux#repeatWhen 运算符,以防您想在队列被视为空后重新订阅队列,例如与:

flux.repeatWhen(it -> it.delayElements(ofSeconds(1)))