如何跳过 Spring Kafka Consumer 中损坏的(不可序列化的)消息?
How to skip corrupt (non-serializable) messages in Spring Kafka Consumer?
此问题针对 Spring Kafka,与
有关
有没有办法配置 Spring Kafka 消费者跳过不能 read/processed(已损坏)的记录?
我看到这样一种情况,如果无法反序列化,消费者就会卡在同一条记录上。这是消费者抛出的错误。
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Can not construct instance of java.time.LocalDate: no long/Long-argument constructor/factory method to deserialize from Number value
消费者轮询主题并在循环中不断打印相同的错误,直到程序被终止。
在具有以下消费者工厂配置的@KafkaListener 中,
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
你需要ErrorHandlingDeserializer
:https://docs.spring.io/spring-kafka/docs/2.2.0.RELEASE/reference/html/_reference.html#error-handling-deserializer
如果您不能移动到那个 2.2
版本,请考虑为那些无法正确反序列化的记录实施您自己的和 return null
。
如果您使用的是旧版本的 kafka,请在 @KafkaListener 中设置以下消费者工厂配置。
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CustomDeserializer.class);
这是 CustomDeserializer 的代码:
import java.util.Map;
import org.apache.kafka.common.serialization.Deserializer;
import com.fasterxml.jackson.databind.ObjectMapper;
public class CustomDeserializer implements Deserializer<Object>
{
@Override
public void configure( Map<String, ?> configs, boolean isKey )
{
}
@Override
public Object deserialize( String topic, byte[] data )
{
ObjectMapper mapper = new ObjectMapper();
Object object = null;
try
{
object = mapper.readValue(data, Object.class);
}
catch ( Exception exception )
{
System.out.println("Error in deserializing bytes " + exception);
}
return object;
}
@Override
public void close()
{
}
}
因为我希望我的代码足够通用以阅读任何类型的 json,
对象 = mapper.readValue(数据, Object.class);我正在将其转换为 Object.class。由于我们在这里捕获异常,因此一旦读取就不会重试。
此问题针对 Spring Kafka,与
有没有办法配置 Spring Kafka 消费者跳过不能 read/processed(已损坏)的记录?
我看到这样一种情况,如果无法反序列化,消费者就会卡在同一条记录上。这是消费者抛出的错误。
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Can not construct instance of java.time.LocalDate: no long/Long-argument constructor/factory method to deserialize from Number value
消费者轮询主题并在循环中不断打印相同的错误,直到程序被终止。
在具有以下消费者工厂配置的@KafkaListener 中,
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
你需要ErrorHandlingDeserializer
:https://docs.spring.io/spring-kafka/docs/2.2.0.RELEASE/reference/html/_reference.html#error-handling-deserializer
如果您不能移动到那个 2.2
版本,请考虑为那些无法正确反序列化的记录实施您自己的和 return null
。
如果您使用的是旧版本的 kafka,请在 @KafkaListener 中设置以下消费者工厂配置。
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CustomDeserializer.class);
这是 CustomDeserializer 的代码:
import java.util.Map;
import org.apache.kafka.common.serialization.Deserializer;
import com.fasterxml.jackson.databind.ObjectMapper;
public class CustomDeserializer implements Deserializer<Object>
{
@Override
public void configure( Map<String, ?> configs, boolean isKey )
{
}
@Override
public Object deserialize( String topic, byte[] data )
{
ObjectMapper mapper = new ObjectMapper();
Object object = null;
try
{
object = mapper.readValue(data, Object.class);
}
catch ( Exception exception )
{
System.out.println("Error in deserializing bytes " + exception);
}
return object;
}
@Override
public void close()
{
}
}
因为我希望我的代码足够通用以阅读任何类型的 json, 对象 = mapper.readValue(数据, Object.class);我正在将其转换为 Object.class。由于我们在这里捕获异常,因此一旦读取就不会重试。