使用来自 Kafka 主题的消息 - 无响应
Consume messages from Kafka Topic - no response
var configs = new Dictionary<string, string>
{
{"bootstrap.servers", MY_SERVER},
{"security.protocol", "SASL_PLAINTEXT"},
{"sasl.mechanism", "SCRAM-SHA-256"},
{"sasl.username", "MY_USERNAME"},
{"sasl.password", "MY_PWD"},
{"group.id", "sample_group"} // added
};
var consumerConfig = new ConsumerConfig(configs);
using (var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig))
using (var consumer = new ConsumerBuilder<string, MyModel>(consumerConfig)
.SetKeyDeserializer(new AvroDeserializer<string>(schemaRegistry, avroSerializerConfig).AsSyncOverAsync())
.SetValueDeserializer(new AvroDeserializer<MyModel>(schemaRegistry, avroSerializerConfig).AsSyncOverAsync())
.Build())
{
consumer.Subscribe(TOPIC_NAME);
while (true)
{
var result = consumer.Consume(); //stuck here
Console.WriteLine(result);
}
}
如代码中所述,consumer.Consume()
没有响应。即使在 consumer.Subscribe()
期间它也不会抛出任何错误消息可能的原因是什么? (我是 Kafka 消费者的新手)
- 可能Topic中没有消息,所以没有接收到?
- 代码要求缺少 'group.id',所以我在配置中添加了
{"group.id", "sample_group"}
并用 ConsumerConfig
换行。 group.id 是否允许使用随机名称(“sample_group”),还是应该从主题信息中检索到某些名称?
- 还有什么吗?
您的代码看起来不错,没有出现错误和异常也是一个好兆头。
"1. Maybe there is no message in Topic, so nothing to receive?"
即使 Kafka 主题中没有消息,您的观察也符合预期的行为。在 while(true)
循环中,您不断尝试从主题中获取数据,如果无法获取任何数据,消费者将在下一次迭代中重试。 Kafka 主题的消费者应该在连续 运行ning 的同时按顺序阅读主题。有时消费者已经消费完所有消息并保持空闲一段时间,直到新消息到达主题,这完全没问题。在等待时间内,消费者不会停止或崩溃。
请记住,Kafka 主题中的消息默认保留 7 天。在那之后,消息将被删除。
"2. The code asked for missing 'group.id', so I added {"group.id", "sample_group"} in config and wrap with ConsumerConfig. Is random name ("sample_group") allowed for group.id or should it be something retrieved from Topic information?"
是的,允许名称“sample_group”作为 ConsumerGroup 名称。没有保留的消费者组名称,所以这个名称不会造成任何麻烦。
"3. anything else?"
默认情况下,KafkaConsumer 从“最新”偏移量读取消息。这意味着,如果您第一次 运行 ConsumerGroup,它不会从头而是从尾读取所有消息。检查 .net Kafka-API 文档中的使用者配置是否有类似 auto_offset_reset
的内容。如果您想从头开始阅读所有消息,您可以将此配置设置为“最早”。请注意,一旦您 运行 您的应用程序第一次使用给定的 ConsumerGroup,第二次您 运行 此应用程序此配置 auto_offset_reset
将不会有任何影响,因为 ConsumerGroup现在已在 Kafka 中注册。
如果您在开始为该主题生成消息之前启动您的消费者,那么您通常可以做的是确保消费者真正阅读消息。然后,(几乎)独立于您的配置,您应该看到数据流经您的应用程序。
var configs = new Dictionary<string, string>
{
{"bootstrap.servers", MY_SERVER},
{"security.protocol", "SASL_PLAINTEXT"},
{"sasl.mechanism", "SCRAM-SHA-256"},
{"sasl.username", "MY_USERNAME"},
{"sasl.password", "MY_PWD"},
{"group.id", "sample_group"} // added
};
var consumerConfig = new ConsumerConfig(configs);
using (var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig))
using (var consumer = new ConsumerBuilder<string, MyModel>(consumerConfig)
.SetKeyDeserializer(new AvroDeserializer<string>(schemaRegistry, avroSerializerConfig).AsSyncOverAsync())
.SetValueDeserializer(new AvroDeserializer<MyModel>(schemaRegistry, avroSerializerConfig).AsSyncOverAsync())
.Build())
{
consumer.Subscribe(TOPIC_NAME);
while (true)
{
var result = consumer.Consume(); //stuck here
Console.WriteLine(result);
}
}
如代码中所述,consumer.Consume()
没有响应。即使在 consumer.Subscribe()
期间它也不会抛出任何错误消息可能的原因是什么? (我是 Kafka 消费者的新手)
- 可能Topic中没有消息,所以没有接收到?
- 代码要求缺少 'group.id',所以我在配置中添加了
{"group.id", "sample_group"}
并用ConsumerConfig
换行。 group.id 是否允许使用随机名称(“sample_group”),还是应该从主题信息中检索到某些名称? - 还有什么吗?
您的代码看起来不错,没有出现错误和异常也是一个好兆头。
"1. Maybe there is no message in Topic, so nothing to receive?"
即使 Kafka 主题中没有消息,您的观察也符合预期的行为。在 while(true)
循环中,您不断尝试从主题中获取数据,如果无法获取任何数据,消费者将在下一次迭代中重试。 Kafka 主题的消费者应该在连续 运行ning 的同时按顺序阅读主题。有时消费者已经消费完所有消息并保持空闲一段时间,直到新消息到达主题,这完全没问题。在等待时间内,消费者不会停止或崩溃。
请记住,Kafka 主题中的消息默认保留 7 天。在那之后,消息将被删除。
"2. The code asked for missing 'group.id', so I added {"group.id", "sample_group"} in config and wrap with ConsumerConfig. Is random name ("sample_group") allowed for group.id or should it be something retrieved from Topic information?"
是的,允许名称“sample_group”作为 ConsumerGroup 名称。没有保留的消费者组名称,所以这个名称不会造成任何麻烦。
"3. anything else?"
默认情况下,KafkaConsumer 从“最新”偏移量读取消息。这意味着,如果您第一次 运行 ConsumerGroup,它不会从头而是从尾读取所有消息。检查 .net Kafka-API 文档中的使用者配置是否有类似 auto_offset_reset
的内容。如果您想从头开始阅读所有消息,您可以将此配置设置为“最早”。请注意,一旦您 运行 您的应用程序第一次使用给定的 ConsumerGroup,第二次您 运行 此应用程序此配置 auto_offset_reset
将不会有任何影响,因为 ConsumerGroup现在已在 Kafka 中注册。
如果您在开始为该主题生成消息之前启动您的消费者,那么您通常可以做的是确保消费者真正阅读消息。然后,(几乎)独立于您的配置,您应该看到数据流经您的应用程序。