Kafka Flux 无法在 Rest Controller 中工作
Kafka Flux not working in Rest Controller
我有以下控制器:
private final Flux<ReceiverRecord<String, String>> fluxReceiver;
@GetMapping(value = "/events", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<String> getEvents() {
return fluxReceiver.map(x -> x.value());
}
然后我这样声明 bean Flux>:
@Bean
public Flux<ReceiverRecord<String, String>> fluxReceiver() {
String bootstrapServers = "localhost:9092";
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "sample-consumer");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "sample-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
ReceiverOptions<String, String> receiverOptions = ReceiverOptions.create(props);
ReceiverOptions<String, String> options = receiverOptions.subscription(Collections.singleton(TOPIC));
return KafkaReceiver.create(options).receive();
}
然后,我可以将它注入控制器,并且 return 当有人调用 rest 端点 /events 时它。
我正在使用 cURL 调用该端点。事情是,当我发送一个事件时,如果我调试它,我可以看到 .map 方法被触发并且我 return 事件的值,在本例中是一个字符串。但是,在 cURL 控制台中我看不到该事件。
但是,如果我不使用 Kafka,而我创建了一个 TopicProcessor,那么我可以在 cURL 中看到该事件。
难道我做错了什么?
免责声明:我对 Kafka 的了解有限。
Kafka 主题是连续的,因此主题的 Flux
表示不会发出 onComplete
。因此,Spring Framework 会将 Flux
转换为使用分块编码的流式响应...
cUrl
期望从服务器返回默认的有限响应。它执行一些缓冲,并且总体上不一定在控制台 到达 时在控制台上写入 HTTP 块。但是在这种流式响应的情况下,你需要它做的是在数据可用时立即输出。
这可以通过 cUrl
的 --no-buffer
标志来完成。
我有以下控制器:
private final Flux<ReceiverRecord<String, String>> fluxReceiver;
@GetMapping(value = "/events", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<String> getEvents() {
return fluxReceiver.map(x -> x.value());
}
然后我这样声明 bean Flux>:
@Bean
public Flux<ReceiverRecord<String, String>> fluxReceiver() {
String bootstrapServers = "localhost:9092";
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "sample-consumer");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "sample-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
ReceiverOptions<String, String> receiverOptions = ReceiverOptions.create(props);
ReceiverOptions<String, String> options = receiverOptions.subscription(Collections.singleton(TOPIC));
return KafkaReceiver.create(options).receive();
}
然后,我可以将它注入控制器,并且 return 当有人调用 rest 端点 /events 时它。
我正在使用 cURL 调用该端点。事情是,当我发送一个事件时,如果我调试它,我可以看到 .map 方法被触发并且我 return 事件的值,在本例中是一个字符串。但是,在 cURL 控制台中我看不到该事件。
但是,如果我不使用 Kafka,而我创建了一个 TopicProcessor,那么我可以在 cURL 中看到该事件。 难道我做错了什么?
免责声明:我对 Kafka 的了解有限。
Kafka 主题是连续的,因此主题的 Flux
表示不会发出 onComplete
。因此,Spring Framework 会将 Flux
转换为使用分块编码的流式响应...
cUrl
期望从服务器返回默认的有限响应。它执行一些缓冲,并且总体上不一定在控制台 到达 时在控制台上写入 HTTP 块。但是在这种流式响应的情况下,你需要它做的是在数据可用时立即输出。
这可以通过 cUrl
的 --no-buffer
标志来完成。