如何使用 spring webflux 从 Kafka 主题中持续消费?
How to continually consume from Kafka topic using spring webflux?
我有一个名为 WordListenerHandler
的 spring-webflux
处理程序,我可以从浏览器请求 HTTP GET。此请求是使用 Flux
: .body(Flux.interval(Duration.ofSeconds(1)).log(), Long.class)
的 Long
流,因此它会不断更新 HTML 而无需刷新页面。我想显示 kafka topi 的消息而不是 Long 值流。所以我有消费者方法 onMessage(ConsumerRecord<String, String> consumerRecord)
正在接收消息,我正在填充一个 Queue<String> stringStream
。我的想法是在 Flux.fromStream
的 HTTP GET 方法上使用此队列。但它是静态的,我必须刷新浏览器才能看到更新。如何让HTTP GET流持续获取数据?
@Component
public class WordListenerHandler {
private final String WORDS_STREAMING_OUTPUT_TOPIC_NAME = "streaming-words-output-topic";
Queue<String> stringStream = new LinkedList<String>();
public Mono<ServerResponse> fluxWordCountStream(ServerRequest serverRequest) {
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_STREAM_JSON)
// DOES NOT WORK. i HAVE TO REFRESH THE BROWSER
.body(Flux.fromStream(stringStream.stream()), String.class);
// THIS WORKS AND I CAN CONTINUALLY SEE MY BROWSER UPDATE WITHOUT REFRESHING IT
// .body(Flux.interval(Duration.ofSeconds(1)).log(), Long.class);
}
@KafkaListener(topics = {WORDS_STREAMING_OUTPUT_TOPIC_NAME})
public void onMessage(ConsumerRecord<String, String> consumerRecord) throws JsonProcessingException {
log.info("ConsumerRecord received: {}", consumerRecord);
String message = consumerRecord.key() + " : " + consumerRecord.value() + "\n";
stringStream.add(message);
}
}
尝试使用 LinkedBlockingQueue
而不是 LinkedList
(这不是线程安全的)。
* <p>The iterators returned by this class's {@code iterator} and
* {@code listIterator} methods are <i>fail-fast</i>: if the list is
* structurally modified at any time after the iterator is created, in
* any way except through the Iterator's own {@code remove} or
* {@code add} methods, the iterator will throw a {@link
* ConcurrentModificationException}. Thus, in the face of concurrent
* modification, the iterator fails quickly and cleanly, rather than
* risking arbitrary, non-deterministic behavior at an undetermined
* time in the future.
我建议查看 Sink
Reactor 的 API:https://projectreactor.io/docs/core/release/reference/#sinks
因此,您的 @KafkaListener
将以反压方式将接收到的数据转储到共享 Sinks.Many
中。
而您的 MediaType.APPLICATION_STREAM_JSON
将只使用来自该接收器的 asFlux()
。
我认为您的问题是该集合中的 stringStream.stream()
没有提供活动对象,而是集合当前状态的快照。所以,这看起来确实很自然,您需要刷新请求以从集合中获取更新状态。
我有一个名为 WordListenerHandler
的 spring-webflux
处理程序,我可以从浏览器请求 HTTP GET。此请求是使用 Flux
: .body(Flux.interval(Duration.ofSeconds(1)).log(), Long.class)
的 Long
流,因此它会不断更新 HTML 而无需刷新页面。我想显示 kafka topi 的消息而不是 Long 值流。所以我有消费者方法 onMessage(ConsumerRecord<String, String> consumerRecord)
正在接收消息,我正在填充一个 Queue<String> stringStream
。我的想法是在 Flux.fromStream
的 HTTP GET 方法上使用此队列。但它是静态的,我必须刷新浏览器才能看到更新。如何让HTTP GET流持续获取数据?
@Component
public class WordListenerHandler {
private final String WORDS_STREAMING_OUTPUT_TOPIC_NAME = "streaming-words-output-topic";
Queue<String> stringStream = new LinkedList<String>();
public Mono<ServerResponse> fluxWordCountStream(ServerRequest serverRequest) {
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_STREAM_JSON)
// DOES NOT WORK. i HAVE TO REFRESH THE BROWSER
.body(Flux.fromStream(stringStream.stream()), String.class);
// THIS WORKS AND I CAN CONTINUALLY SEE MY BROWSER UPDATE WITHOUT REFRESHING IT
// .body(Flux.interval(Duration.ofSeconds(1)).log(), Long.class);
}
@KafkaListener(topics = {WORDS_STREAMING_OUTPUT_TOPIC_NAME})
public void onMessage(ConsumerRecord<String, String> consumerRecord) throws JsonProcessingException {
log.info("ConsumerRecord received: {}", consumerRecord);
String message = consumerRecord.key() + " : " + consumerRecord.value() + "\n";
stringStream.add(message);
}
}
尝试使用 LinkedBlockingQueue
而不是 LinkedList
(这不是线程安全的)。
* <p>The iterators returned by this class's {@code iterator} and
* {@code listIterator} methods are <i>fail-fast</i>: if the list is
* structurally modified at any time after the iterator is created, in
* any way except through the Iterator's own {@code remove} or
* {@code add} methods, the iterator will throw a {@link
* ConcurrentModificationException}. Thus, in the face of concurrent
* modification, the iterator fails quickly and cleanly, rather than
* risking arbitrary, non-deterministic behavior at an undetermined
* time in the future.
我建议查看 Sink
Reactor 的 API:https://projectreactor.io/docs/core/release/reference/#sinks
因此,您的 @KafkaListener
将以反压方式将接收到的数据转储到共享 Sinks.Many
中。
而您的 MediaType.APPLICATION_STREAM_JSON
将只使用来自该接收器的 asFlux()
。
我认为您的问题是该集合中的 stringStream.stream()
没有提供活动对象,而是集合当前状态的快照。所以,这看起来确实很自然,您需要刷新请求以从集合中获取更新状态。