无法通过 Spring Boot 使用来自 Kafka 主题的消息,但控制台使用者正在使用相同的消息
Not able to consume the message from Kafka topic through SpringBoot but the same messgae is being consumed by console consumer
我有一个简单的 SpringBoot 应用程序,它同时具有生产者和消费者作为服务。它们存在于同一个应用程序中。
我的控制器收到了消息,但应用程序的消费者层没有收到。因此,我从控制器向生产者提供消息,但消费者未访问它。问题是消息没有被消费者消费。
这是 eclipse ide 记录器消息,我在 运行 springboot 应用程序和控制器收到第一条消息时得到,
2021-02-23 22:33:50.637 INFO 13792 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 3 ms
2021-02-23 22:33:50.898 INFO 13792 --- [nio-8080-exec-1] c.h.a.m.controller.KafkaController : writeStringMessageToTopic called.
2021-02-23 22:33:50.899 INFO 13792 --- [nio-8080-exec-1] c.h.a.m.controller.KafkaController : recieved string message = '"22:26 Hello World"'
2021-02-23 22:33:50.953 INFO 13792 --- [nio-8080-exec-1] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = producer-1
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
internal.auto.downgrade.txn.commit = true
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
2021-02-23 22:33:51.200 INFO 13792 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.6.0
2021-02-23 22:33:51.201 INFO 13792 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 62abe01bee039651
2021-02-23 22:33:51.201 INFO 13792 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1614099831197
2021-02-23 22:33:51.645 INFO 13792 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: paGjVV-5RVyOatWyXOzBrQ
在我的 KafkaConsumer class 用于消费消息的日志消息上方的最后一条消息之后,永远不会打印出来。似乎消费者甚至没有被触发。为什么会这样?我是不是配置有误
这是我的 KafkaConsumerConfig.java class
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
public ConsumerFactory<String, String> consumerFactory(String groupId) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(String groupId) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory(groupId));
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> fooKafkaListenerContainerFactory() {
return kafkaListenerContainerFactory("foo");
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> barKafkaListenerContainerFactory() {
return kafkaListenerContainerFactory("bar");
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> headersKafkaListenerContainerFactory() {
return kafkaListenerContainerFactory("headers");
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> partitionsKafkaListenerContainerFactory() {
return kafkaListenerContainerFactory("partitions");
}
public ConsumerFactory<String, EntryObject> entryObjectConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "entryObject");
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(EntryObject.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, EntryObject> entryObjectKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, EntryObject> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(entryObjectConsumerFactory());
return factory;
}
}
这是我的 KafkaConsumer.java class
public class KafkaConsumer {
private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
@KafkaListener(topics = "myTopic", groupId = "foo", containerFactory = "fooKafkaListenerContainerFactory")
public void receive(String message) throws IOException {
logger.info(String.format("#### -> Consumed message -> %s", message));
}
@KafkaListener(topics = "myTopic", containerFactory = "greetingKafkaListenerContainerFactory")
public void receive(EntryObject entryObject) throws IOException {
logger.info("received entryObject = '{}'", entryObject.toString());
}
}
这是我的 application.properties 文件
kafka.bootstrapAddress=localhost:9092
message.topic.name=myTopic
Kafka是运行ning在我的机器上验证过的,同样的topic在自己的本地终端也可以访问。但是对于 SpringBoot 消费者,我无法访问它。我在代码中犯的任何错误。
KafkaListener
需要声明为@Bean
,注解为@Component
或@Service
(如果在服务层,更具体)。
我有一个简单的 SpringBoot 应用程序,它同时具有生产者和消费者作为服务。它们存在于同一个应用程序中。
我的控制器收到了消息,但应用程序的消费者层没有收到。因此,我从控制器向生产者提供消息,但消费者未访问它。问题是消息没有被消费者消费。
这是 eclipse ide 记录器消息,我在 运行 springboot 应用程序和控制器收到第一条消息时得到,
2021-02-23 22:33:50.637 INFO 13792 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 3 ms
2021-02-23 22:33:50.898 INFO 13792 --- [nio-8080-exec-1] c.h.a.m.controller.KafkaController : writeStringMessageToTopic called.
2021-02-23 22:33:50.899 INFO 13792 --- [nio-8080-exec-1] c.h.a.m.controller.KafkaController : recieved string message = '"22:26 Hello World"'
2021-02-23 22:33:50.953 INFO 13792 --- [nio-8080-exec-1] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = producer-1
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
internal.auto.downgrade.txn.commit = true
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
2021-02-23 22:33:51.200 INFO 13792 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.6.0
2021-02-23 22:33:51.201 INFO 13792 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 62abe01bee039651
2021-02-23 22:33:51.201 INFO 13792 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1614099831197
2021-02-23 22:33:51.645 INFO 13792 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: paGjVV-5RVyOatWyXOzBrQ
在我的 KafkaConsumer class 用于消费消息的日志消息上方的最后一条消息之后,永远不会打印出来。似乎消费者甚至没有被触发。为什么会这样?我是不是配置有误
这是我的 KafkaConsumerConfig.java class
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
public ConsumerFactory<String, String> consumerFactory(String groupId) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(String groupId) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory(groupId));
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> fooKafkaListenerContainerFactory() {
return kafkaListenerContainerFactory("foo");
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> barKafkaListenerContainerFactory() {
return kafkaListenerContainerFactory("bar");
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> headersKafkaListenerContainerFactory() {
return kafkaListenerContainerFactory("headers");
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> partitionsKafkaListenerContainerFactory() {
return kafkaListenerContainerFactory("partitions");
}
public ConsumerFactory<String, EntryObject> entryObjectConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "entryObject");
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(EntryObject.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, EntryObject> entryObjectKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, EntryObject> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(entryObjectConsumerFactory());
return factory;
}
}
这是我的 KafkaConsumer.java class
public class KafkaConsumer {
private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
@KafkaListener(topics = "myTopic", groupId = "foo", containerFactory = "fooKafkaListenerContainerFactory")
public void receive(String message) throws IOException {
logger.info(String.format("#### -> Consumed message -> %s", message));
}
@KafkaListener(topics = "myTopic", containerFactory = "greetingKafkaListenerContainerFactory")
public void receive(EntryObject entryObject) throws IOException {
logger.info("received entryObject = '{}'", entryObject.toString());
}
}
这是我的 application.properties 文件
kafka.bootstrapAddress=localhost:9092
message.topic.name=myTopic
Kafka是运行ning在我的机器上验证过的,同样的topic在自己的本地终端也可以访问。但是对于 SpringBoot 消费者,我无法访问它。我在代码中犯的任何错误。
KafkaListener
需要声明为@Bean
,注解为@Component
或@Service
(如果在服务层,更具体)。