如何在 Spring Kafka 中接收来自 KSQL 的流式响应?
How to receive streamed response from KSQL in Spring Kafka?
如何在 java spring 启动应用程序中接收来自 kafka KSQL 服务器的分块响应?
当我对 /query
端点进行休息调用时,我只得到 1 行并且连接关闭。如何保持连接打开并接收多行?
文档说
The response is streamed back until the LIMIT specified in the
statement is reached, or the client closes the connection.
在java中实现这个的方法是什么?即使对于 KTable,我在 return.
中也只有 1 行
我能够解决的方法如下:
- 获取字符串形式的响应
逐行解析JSON个对象(KafkaQueryResponse
是一个代表1行的对象)
ResponseEntity<String> result = template.exchange("/query",
HttpMethod.POST,
new HttpEntity<>(params, headers),
String.class);
List<KafkaQueryResponse> array = new ArrayList<>();
JsonFactory jsonFactory = new JsonFactory();
try(BufferedReader br = new BufferedReader(new StringReader(result.getBody()))) {
Iterator<KafkaQueryResponse> value = objectMapper.readValues(jsonFactory.createParser(br), KafkaQueryResponse.class);
value.forEachRemaining(e -> {
if (e.getRow() != null) {
array.add(e);
}
});
}
array <---- this is the list of JSON objects
KafkaQueryResponse
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class KafkaQueryResponse {
private KafkaQueryRow row;
private String finalMessage;
private String errorMessage;
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public static class KafkaQueryRow {
private List<Object> columns;
}
}
此解决方案不允许读取块中的流式响应。它等待整个响应到达客户端,然后关闭连接,然后解析所有 json 个对象。
如何在 java spring 启动应用程序中接收来自 kafka KSQL 服务器的分块响应?
当我对 /query
端点进行休息调用时,我只得到 1 行并且连接关闭。如何保持连接打开并接收多行?
文档说
The response is streamed back until the LIMIT specified in the statement is reached, or the client closes the connection.
在java中实现这个的方法是什么?即使对于 KTable,我在 return.
中也只有 1 行我能够解决的方法如下:
- 获取字符串形式的响应
逐行解析JSON个对象(
KafkaQueryResponse
是一个代表1行的对象)ResponseEntity<String> result = template.exchange("/query", HttpMethod.POST, new HttpEntity<>(params, headers), String.class); List<KafkaQueryResponse> array = new ArrayList<>(); JsonFactory jsonFactory = new JsonFactory(); try(BufferedReader br = new BufferedReader(new StringReader(result.getBody()))) { Iterator<KafkaQueryResponse> value = objectMapper.readValues(jsonFactory.createParser(br), KafkaQueryResponse.class); value.forEachRemaining(e -> { if (e.getRow() != null) { array.add(e); } }); } array <---- this is the list of JSON objects
KafkaQueryResponse
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class KafkaQueryResponse {
private KafkaQueryRow row;
private String finalMessage;
private String errorMessage;
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public static class KafkaQueryRow {
private List<Object> columns;
}
}
此解决方案不允许读取块中的流式响应。它等待整个响应到达客户端,然后关闭连接,然后解析所有 json 个对象。