使用来自 Kafka 主题的消息 - 无响应

Consume messages from Kafka Topic - no response

var configs = new Dictionary<string, string>
{
    {"bootstrap.servers", MY_SERVER},
    {"security.protocol", "SASL_PLAINTEXT"},
    {"sasl.mechanism", "SCRAM-SHA-256"},
    {"sasl.username", "MY_USERNAME"},
    {"sasl.password", "MY_PWD"},
    {"group.id", "sample_group"} // added
};
var consumerConfig = new ConsumerConfig(configs);    

using (var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig))
using (var consumer = new ConsumerBuilder<string, MyModel>(consumerConfig)
           .SetKeyDeserializer(new AvroDeserializer<string>(schemaRegistry, avroSerializerConfig).AsSyncOverAsync())
           .SetValueDeserializer(new AvroDeserializer<MyModel>(schemaRegistry, avroSerializerConfig).AsSyncOverAsync())
           .Build())
{
      consumer.Subscribe(TOPIC_NAME);

      while (true)
      {
          var result = consumer.Consume(); //stuck here
          Console.WriteLine(result);
      }
 }

如代码中所述,consumer.Consume() 没有响应。即使在 consumer.Subscribe() 期间它也不会抛出任何错误消息可能的原因是什么? (我是 Kafka 消费者的新手)

  1. 可能Topic中没有消息,所以没有接收到?
  2. 代码要求缺少 'group.id',所以我在配置中添加了 {"group.id", "sample_group"} 并用 ConsumerConfig 换行。 group.id 是否允许使用随机名称(“sample_group”),还是应该从主题信息中检索到某些名称?
  3. 还有什么吗?

您的代码看起来不错,没有出现错误和异常也是一个好兆头。

"1. Maybe there is no message in Topic, so nothing to receive?"

即使 Kafka 主题中没有消息,您的观察也符合预期的行为。在 while(true) 循环中,您不断尝试从主题中获取数据,如果无法获取任何数据,消费者将在下一次迭代中重试。 Kafka 主题的消费者应该在连续 运行ning 的同时按顺序阅读主题。有时消费者已经消费完所有消息并保持空闲一段时间,直到新消息到达主题,这完全没问题。在等待时间内,消费者不会停止或崩溃。

请记住,Kafka 主题中的消息默认保留 7 天。在那之后,消息将被删除。

"2. The code asked for missing 'group.id', so I added {"group.id", "sample_group"} in config and wrap with ConsumerConfig. Is random name ("sample_group") allowed for group.id or should it be something retrieved from Topic information?"

是的,允许名称“sample_group”作为 ConsumerGroup 名称。没有保留的消费者组名称,所以这个名称不会造成任何麻烦。

"3. anything else?"

默认情况下,KafkaConsumer 从“最新”偏移量读取消息。这意味着,如果您第一次 运行 ConsumerGroup,它不会从头而是从尾读取所有消息。检查 .net Kafka-API 文档中的使用者配置是否有类似 auto_offset_reset 的内容。如果您想从头开始阅读所有消息,您可以将此配置设置为“最早”。请注意,一旦您 运行 您的应用程序第一次使用给定的 ConsumerGroup,第二次您 运行 此应用程序此配置 auto_offset_reset 将不会有任何影响,因为 ConsumerGroup现在已在 Kafka 中注册。

如果您在开始为该主题生成消息之前启动您的消费者,那么您通常可以做的是确保消费者真正阅读消息。然后,(几乎)独立于您的配置,您应该看到数据流经您的应用程序。