无法通过 Spring Boot 使用来自 Kafka 主题的消息,但控制台使用者正在使用相同的消息

Not able to consume the message from Kafka topic through SpringBoot but the same messgae is being consumed by console consumer

我有一个简单的 SpringBoot 应用程序,它同时具有生产者和消费者作为服务。它们存在于同一个应用程序中。

我的控制器收到了消息,但应用程序的消费者层没有收到。因此,我从控制器向生产者提供消息,但消费者未访问它。问题是消息没有被消费者消费。

这是 eclipse ide 记录器消息,我在 运行 springboot 应用程序和控制器收到第一条消息时得到,

2021-02-23 22:33:50.637  INFO 13792 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 3 ms
2021-02-23 22:33:50.898  INFO 13792 --- [nio-8080-exec-1] c.h.a.m.controller.KafkaController       : writeStringMessageToTopic called.
2021-02-23 22:33:50.899  INFO 13792 --- [nio-8080-exec-1] c.h.a.m.controller.KafkaController       : recieved string message = '"22:26 Hello World"'
2021-02-23 22:33:50.953  INFO 13792 --- [nio-8080-exec-1] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
    acks = 1
    batch.size = 16384
    bootstrap.servers = [localhost:9092]
    buffer.memory = 33554432
    client.dns.lookup = use_all_dns_ips
    client.id = producer-1
    compression.type = none
    connections.max.idle.ms = 540000
    delivery.timeout.ms = 120000
    enable.idempotence = false
    interceptor.classes = []
    internal.auto.downgrade.txn.commit = true
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer
    linger.ms = 0
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metadata.max.idle.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 2147483647
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.timeout.ms = 60000
    transactional.id = null
    value.serializer = class org.apache.kafka.common.serialization.StringSerializer

2021-02-23 22:33:51.200  INFO 13792 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.6.0
2021-02-23 22:33:51.201  INFO 13792 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 62abe01bee039651
2021-02-23 22:33:51.201  INFO 13792 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1614099831197
2021-02-23 22:33:51.645  INFO 13792 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Cluster ID: paGjVV-5RVyOatWyXOzBrQ

在我的 KafkaConsumer class 用于消费消息的日志消息上方的最后一条消息之后,永远不会打印出来。似乎消费者甚至没有被触发。为什么会这样?我是不是配置有误

这是我的 KafkaConsumerConfig.java class

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    public ConsumerFactory<String, String> consumerFactory(String groupId) {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(String groupId) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory(groupId));
        return factory;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> fooKafkaListenerContainerFactory() {
        return kafkaListenerContainerFactory("foo");
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> barKafkaListenerContainerFactory() {
        return kafkaListenerContainerFactory("bar");
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> headersKafkaListenerContainerFactory() {
        return kafkaListenerContainerFactory("headers");
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> partitionsKafkaListenerContainerFactory() {
        return kafkaListenerContainerFactory("partitions");
    }

   
    public ConsumerFactory<String, EntryObject> entryObjectConsumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "entryObject");
        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(EntryObject.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, EntryObject> entryObjectKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, EntryObject> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(entryObjectConsumerFactory());
        return factory;
    }
}

这是我的 KafkaConsumer.java class

public class KafkaConsumer {
    
    private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
    
    @KafkaListener(topics = "myTopic", groupId = "foo", containerFactory = "fooKafkaListenerContainerFactory")
    public void receive(String message) throws IOException {
        logger.info(String.format("#### -> Consumed message -> %s", message));
    }

    @KafkaListener(topics = "myTopic", containerFactory = "greetingKafkaListenerContainerFactory")
    public void receive(EntryObject entryObject) throws IOException {
        logger.info("received entryObject = '{}'", entryObject.toString());
    }
}

这是我的 application.properties 文件

kafka.bootstrapAddress=localhost:9092
message.topic.name=myTopic

Kafka是运行ning在我的机器上验证过的,同样的topic在自己的本地终端也可以访问。但是对于 SpringBoot 消费者,我无法访问它。我在代码中犯的任何错误。

KafkaListener需要声明为@Bean,注解为@Component@Service(如果在服务层,更具体)。