Kafka Flux 无法在 Rest Controller 中工作

Kafka Flux not working in Rest Controller

我有以下控制器:

private final Flux<ReceiverRecord<String, String>> fluxReceiver;

@GetMapping(value = "/events", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<String> getEvents() {
    return fluxReceiver.map(x -> x.value());
}

然后我这样声明 bean Flux>:

@Bean
public Flux<ReceiverRecord<String, String>> fluxReceiver() {
    String bootstrapServers = "localhost:9092";
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.CLIENT_ID_CONFIG, "sample-consumer");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "sample-group");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    ReceiverOptions<String, String> receiverOptions = ReceiverOptions.create(props);

    ReceiverOptions<String, String> options = receiverOptions.subscription(Collections.singleton(TOPIC));

    return KafkaReceiver.create(options).receive();

}

然后,我可以将它注入控制器,并且 return 当有人调用 rest 端点 /events 时它。

我正在使用 cURL 调用该端点。事情是,当我发送一个事件时,如果我调试它,我可以看到 .map 方法被触发并且我 return 事件的值,在本例中是一个字符串。但是,在 cURL 控制台中我看不到该事件。

但是,如果我不使用 Kafka,而我创建了一个 TopicProcessor,那么我可以在 cURL 中看到该事件。 难道我做错了什么?

免责声明:我对 Kafka 的了解有限。

Kafka 主题是连续的,因此主题的 Flux 表示不会发出 onComplete。因此,Spring Framework 会将 Flux 转换为使用分块编码的流式响应...

cUrl 期望从服务器返回默认的有限响应。它执行一些缓冲,并且总体上不一定在控制台 到达 时在控制台上写入 HTTP 块。但是在这种流式响应的情况下,你需要它做的是在数据可用时立即输出。

这可以通过 cUrl--no-buffer 标志来完成。