如何在 Kafka KStream 中过滤无效的传入 json 数据

How to filter invalid incoming json data in Kafka KStream

我有一个 KStream,它从 kafka (2.3.0) 主题读取 json 消息。因为我不能保证,所有这些消息都是有效的 json 消息,结果是 JsonParseExceptions,这让我的程序崩溃,而且它们似乎无法被 Kafka 管理。

我创建了以下源代码,它可以很好地处理有效消息,然后将其解析为 JsonNode。

    final StreamsBuilder builder = new StreamsBuilder();
    final Serde<JsonNode> jsonSerde = Serdes.serdeFrom( new JsonSerializer(), new JsonDeserializer());
    final Consumed<String, JsonNode> consumed = Consumed.with(Serdes.String(), jsonSerde);
    final KStream<String, JsonNode> input = builder.stream("myTopic", consumed);

在我的例子中,主题中有一些 XML 消息,它们无法解析为 JsonNode-Object。抛出以下异常:

Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('<' (code 60)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')
 at [Source: (byte[])"<StupidXML>"[truncated 289 bytes]; line: 1, column: 2]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('<' (code 60)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')
 at [Source: (byte[])"<StupidXML>"[truncated 289 bytes]; line: 1, column: 2]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804)

有没有可能对抛出异常做出反应?或者是否有某种 "ignoring / logging" 无效消息的配置?或者捕获这些异常?

非常感谢

您可以使用StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG 属性 来选择反序列化异常处理程序。

根据 java 文档,默认为 org.apache.kafka.streams.errors.LogAndFailExceptionHandler

Deserialization handler that logs a deserialization exception and then signals the processing pipeline to stop processing more records and fail.

你可以改成org.apache.kafka.streams.errors.LogAndContinueExceptionHandler:

Deserialization handler that logs a deserialization exception and then signals the processing pipeline to continue processing more records.

您还可以创建自定义反序列化异常处理程序,只需实现 DeserializationExceptionHandler