spring Kafka 负载类型的模糊方法
spring Kafka Ambiguous methods for payload type
我正在尝试使用 spring-kafka-2.2.0
和 spring-boot-2.1.0
从 Kafka 主题中消耗两种不同类型的有效载荷,但不幸的是,到目前为止运气不佳。
所以我的项目中有两个模型 com.kafka.model.Professor
和 com.kafka.model.Student
,我也在类型映射中配置了它们。但是我不确定在这个反序列化中我遗漏了哪一部分。
配置
@Configuration
public class KafkaConsumerConfig {
@Value("${kafka.consumer.bootstrap-server}")
private String bootstrapServer;
@Bean("consumerConfigs")
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-4");
props.put(JsonDeserializer.TYPE_MAPPINGS,
"professor:com.kafka.model.Professor, student:com.kafka.model.Student");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "test-4");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
return props;
}
@Bean("consumerFactory")
public ConsumerFactory<String, Object> consumerFactory() {
JsonDeserializer<Object> jsonDeserializer = new JsonDeserializer<>(Object.class);
jsonDeserializer.addTrustedPackages("*");
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), jsonDeserializer);
}
@Bean("kafkaListenerContainerFactory")
@ConditionalOnMissingBean(name = "KafkaAutoConfiguration")
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.getContainerProperties().setAckMode(AckMode.BATCH);
return factory;
}
}
听众class
@Service
@KafkaListener(topics = "test-events",id = "kafkaListenerContainerFactory-11")
public class KafkaConsumerService {
@KafkaHandler
public void student(List<Student> stu) {
System.out.println(stu);
}
@KafkaHandler
public void professor(List<Professor> pro) {
System.out.println(pro);
}
}
错误
2019-02-15 13:04:33.060 ERROR 14863 --- [actory-11-0-C-1] o.s.k.listener.BatchLoggingErrorHandler : Error while processing:
ConsumerRecord(topic = stores-pricing-easytest-manual-events, partition = 4, offset = 1189400, CreateTime = 1550096019845, serialized key size = 7, serialized value size = 49, headers = RecordHeaders(headers = [], isReadOnly = false), key = student, value = {studentName=thomas, rollNum=109, age=35})
org.springframework.kafka.KafkaException: Ambiguous methods for payload type: class java.util.ArrayList: student and professor
@KafkaHandler
方法不适用于 BATCH
模式。至少以您现在拥有的方式:没有方法可以通过通用参数处理不同的批次。
考虑关闭批处理模式或仅在一个 @KafkaListener
方法中处理所有内容,该方法已经接受 List
作为具有任何通用信息的有效负载的参数。
注意:Apache Kafka 不会在不同批次之间分发反序列化记录。它们都将在同一个 ConsumerRecords
中为听众制作。因此,您尝试按列表中的泛型类型进行分发的逻辑永远不会发生,即使我们会找出方法参数的泛型类型......所以,普通的 RECORD
more 是你的救星.
我正在尝试使用 spring-kafka-2.2.0
和 spring-boot-2.1.0
从 Kafka 主题中消耗两种不同类型的有效载荷,但不幸的是,到目前为止运气不佳。
所以我的项目中有两个模型 com.kafka.model.Professor
和 com.kafka.model.Student
,我也在类型映射中配置了它们。但是我不确定在这个反序列化中我遗漏了哪一部分。
配置
@Configuration
public class KafkaConsumerConfig {
@Value("${kafka.consumer.bootstrap-server}")
private String bootstrapServer;
@Bean("consumerConfigs")
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-4");
props.put(JsonDeserializer.TYPE_MAPPINGS,
"professor:com.kafka.model.Professor, student:com.kafka.model.Student");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "test-4");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
return props;
}
@Bean("consumerFactory")
public ConsumerFactory<String, Object> consumerFactory() {
JsonDeserializer<Object> jsonDeserializer = new JsonDeserializer<>(Object.class);
jsonDeserializer.addTrustedPackages("*");
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), jsonDeserializer);
}
@Bean("kafkaListenerContainerFactory")
@ConditionalOnMissingBean(name = "KafkaAutoConfiguration")
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.getContainerProperties().setAckMode(AckMode.BATCH);
return factory;
}
}
听众class
@Service
@KafkaListener(topics = "test-events",id = "kafkaListenerContainerFactory-11")
public class KafkaConsumerService {
@KafkaHandler
public void student(List<Student> stu) {
System.out.println(stu);
}
@KafkaHandler
public void professor(List<Professor> pro) {
System.out.println(pro);
}
}
错误
2019-02-15 13:04:33.060 ERROR 14863 --- [actory-11-0-C-1] o.s.k.listener.BatchLoggingErrorHandler : Error while processing:
ConsumerRecord(topic = stores-pricing-easytest-manual-events, partition = 4, offset = 1189400, CreateTime = 1550096019845, serialized key size = 7, serialized value size = 49, headers = RecordHeaders(headers = [], isReadOnly = false), key = student, value = {studentName=thomas, rollNum=109, age=35})
org.springframework.kafka.KafkaException: Ambiguous methods for payload type: class java.util.ArrayList: student and professor
@KafkaHandler
方法不适用于 BATCH
模式。至少以您现在拥有的方式:没有方法可以通过通用参数处理不同的批次。
考虑关闭批处理模式或仅在一个 @KafkaListener
方法中处理所有内容,该方法已经接受 List
作为具有任何通用信息的有效负载的参数。
注意:Apache Kafka 不会在不同批次之间分发反序列化记录。它们都将在同一个 ConsumerRecords
中为听众制作。因此,您尝试按列表中的泛型类型进行分发的逻辑永远不会发生,即使我们会找出方法参数的泛型类型......所以,普通的 RECORD
more 是你的救星.