spring-kafka 中可能的空指针
Possible Nullpointer in spring-kafka
我们正在使用 spring-kafka-2.0.1.RELEASE 并在生产中遇到此错误:
exception java.lang.NullPointerException
java.lang.NullPointerException: null at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access[=12=]0(KafkaMessageListenerContainer.java:271)
~[spring-kafka-2.0.1.RELEASE.jar:2.0.1.RELEASE] at
org.springframework.kafka.listener.KafkaMessageListenerContainer.getAssignedPartitions(KafkaMessageListenerContainer.java:152)
~[spring-kafka-2.0.1.RELEASE.jar:2.0.1.RELEASE] at
我们正在使用以下道具:
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, host);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, EspDLPResponseDeserializer.class);
props.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, reconnectMs); // when esp server unreachable, default value too small
props.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, retryBackoff); // default value too small
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRec);
if (saslEnabled) {
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
// jaas.conf with -D server start parameter
props.put(SaslConfigs.SASL_JAAS_CONFIG, jaas_config);
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, ssl_keystore);
}
你能检查一下吗?
非常感谢
这绝对是一个错误。如果 this.listenerConsumer != null
,我们确实不检查那里。此 属性 是从 doStart()
填充的,但不能保证在此之前不会调用 getAssignedPartitions()
。例如,我在 toString()
:
中看到它的用法
public String toString() {
return "KafkaMessageListenerContainer [id=" + getBeanName()
+ (this.clientIdSuffix != null ? ", clientIndex=" + this.clientIdSuffix : "")
+ ", topicPartitions=" + getAssignedPartitions()
+ "]";
}
请就此事提出 GH 问题:https://github.com/spring-projects/spring-kafka/issues
作为解决方法,不要创建 bean。
我们正在使用 spring-kafka-2.0.1.RELEASE 并在生产中遇到此错误:
exception java.lang.NullPointerException java.lang.NullPointerException: null at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access[=12=]0(KafkaMessageListenerContainer.java:271) ~[spring-kafka-2.0.1.RELEASE.jar:2.0.1.RELEASE] at org.springframework.kafka.listener.KafkaMessageListenerContainer.getAssignedPartitions(KafkaMessageListenerContainer.java:152) ~[spring-kafka-2.0.1.RELEASE.jar:2.0.1.RELEASE] at
我们正在使用以下道具:
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, host);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, EspDLPResponseDeserializer.class);
props.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, reconnectMs); // when esp server unreachable, default value too small
props.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, retryBackoff); // default value too small
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRec);
if (saslEnabled) {
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
// jaas.conf with -D server start parameter
props.put(SaslConfigs.SASL_JAAS_CONFIG, jaas_config);
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, ssl_keystore);
}
你能检查一下吗?
非常感谢
这绝对是一个错误。如果 this.listenerConsumer != null
,我们确实不检查那里。此 属性 是从 doStart()
填充的,但不能保证在此之前不会调用 getAssignedPartitions()
。例如,我在 toString()
:
public String toString() {
return "KafkaMessageListenerContainer [id=" + getBeanName()
+ (this.clientIdSuffix != null ? ", clientIndex=" + this.clientIdSuffix : "")
+ ", topicPartitions=" + getAssignedPartitions()
+ "]";
}
请就此事提出 GH 问题:https://github.com/spring-projects/spring-kafka/issues
作为解决方法,不要创建 bean。