如何使用 spring webflux 从 Kafka 主题中持续消费?

How to continually consume from Kafka topic using spring webflux?

我有一个名为 WordListenerHandlerspring-webflux 处理程序,我可以从浏览器请求 HTTP GET。此请求是使用 Flux: .body(Flux.interval(Duration.ofSeconds(1)).log(), Long.class)Long 流,因此它会不断更新 HTML 而无需刷新页面。我想显示 kafka topi 的消息而不是 Long 值流。所以我有消费者方法 onMessage(ConsumerRecord<String, String> consumerRecord) 正在接收消息,我正在填充一个 Queue<String> stringStream。我的想法是在 Flux.fromStream 的 HTTP GET 方法上使用此队列。但它是静态的,我必须刷新浏览器才能看到更新。如何让HTTP GET流持续获取数据?

@Component
public class WordListenerHandler {

    private final String WORDS_STREAMING_OUTPUT_TOPIC_NAME = "streaming-words-output-topic";

    Queue<String> stringStream = new LinkedList<String>();

    public Mono<ServerResponse> fluxWordCountStream(ServerRequest serverRequest) {

        return ServerResponse.ok()
                .contentType(MediaType.APPLICATION_STREAM_JSON)

                 // DOES NOT WORK. i HAVE TO REFRESH THE BROWSER
                .body(Flux.fromStream(stringStream.stream()), String.class);

               // THIS WORKS AND I CAN CONTINUALLY SEE MY BROWSER UPDATE WITHOUT REFRESHING IT
               // .body(Flux.interval(Duration.ofSeconds(1)).log(), Long.class);
    }

    @KafkaListener(topics = {WORDS_STREAMING_OUTPUT_TOPIC_NAME})
    public void onMessage(ConsumerRecord<String, String> consumerRecord) throws JsonProcessingException {
        log.info("ConsumerRecord received: {}", consumerRecord);

        String message = consumerRecord.key() + " : " + consumerRecord.value() + "\n";
        stringStream.add(message);
    }
}

尝试使用 LinkedBlockingQueue 而不是 LinkedList(这不是线程安全的)。

 * <p>The iterators returned by this class's {@code iterator} and
 * {@code listIterator} methods are <i>fail-fast</i>: if the list is
 * structurally modified at any time after the iterator is created, in
 * any way except through the Iterator's own {@code remove} or
 * {@code add} methods, the iterator will throw a {@link
 * ConcurrentModificationException}.  Thus, in the face of concurrent
 * modification, the iterator fails quickly and cleanly, rather than
 * risking arbitrary, non-deterministic behavior at an undetermined
 * time in the future.

我建议查看 Sink Reactor 的 API:https://projectreactor.io/docs/core/release/reference/#sinks

因此,您的 @KafkaListener 将以反压方式将接收到的数据转储到共享 Sinks.Many 中。

而您的 MediaType.APPLICATION_STREAM_JSON 将只使用来自该接收器的 asFlux()

我认为您的问题是该集合中的 stringStream.stream() 没有提供活动对象,而是集合当前状态的快照。所以,这看起来确实很自然,您需要刷新请求以从集合中获取更新状态。