Return 没有消息时来自 Kafka 消费者
Return from Kafka consumer when there is no message
我想在应用程序启动时使用 Confluent dotnet client 处理主题。假设以下示例:
while (true)
{
try
{
var cr = c.Consume();
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
当Kafka没有新消息时,c.Consume会被阻塞。因为我想将它用于应用程序启动(如缓存预热),所以我想在发现没有新消息时继续我的代码。
我知道像 c.Consume(timeout)
这样设置超时会导致过载,但这种方法的问题是,如果您的主题中有一条消息并且阅读该消息的持续时间超过了您的超时时间,您收到不希望的空输出。
消费者不应该知道生产者。
现在如果你想知道从你开始消费的那一刻起你已经阅读了主题中的所有内容,你可以:
- 在开始消费之前加载最新的偏移量。
- 然后开始消费消息。
- 如果消息的偏移量与您之前加载的最新偏移量相同,则停止消费。
我不是 C#
开发人员,但根据我在 dotnet 融合文档中阅读的内容,您可以调用 QueryWatermarkOffsets
消费者以获得最旧和最新的偏移量。 https://docs.confluent.io/current/clients/confluent-kafka-dotnet/api/Confluent.Kafka.Consumer.html#Confluent_Kafka_Consumer_QueryWatermarkOffsets_Confluent_Kafka_TopicPartition_
然后,在 Message
class 上,您有一个 Offset
访问器。所以整个事情应该不会太难实现。
https://docs.confluent.io/current/clients/confluent-kafka-dotnet/api/Confluent.Kafka.Message.html#Confluent_Kafka_Message_Offset
您可以使用 OnPartitionEOF
事件来指示您已到达分区末尾。
CancellationTokenSource source = new CancellationTokenSource();
bool isContinue = true;
c.OnPartitionEOF += (o, e) =>
{
Console.WriteLine($"You have reached end of partition");
isContinue = false;
source.Cancel();
};
while (isContinue)
{
try
{
var cr = c.Consume(source.Token);
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
我发现 Consumer.IsPartitionEOF 很有用。
我想在应用程序启动时使用 Confluent dotnet client 处理主题。假设以下示例:
while (true)
{
try
{
var cr = c.Consume();
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
当Kafka没有新消息时,c.Consume会被阻塞。因为我想将它用于应用程序启动(如缓存预热),所以我想在发现没有新消息时继续我的代码。
我知道像 c.Consume(timeout)
这样设置超时会导致过载,但这种方法的问题是,如果您的主题中有一条消息并且阅读该消息的持续时间超过了您的超时时间,您收到不希望的空输出。
消费者不应该知道生产者。
现在如果你想知道从你开始消费的那一刻起你已经阅读了主题中的所有内容,你可以:
- 在开始消费之前加载最新的偏移量。
- 然后开始消费消息。
- 如果消息的偏移量与您之前加载的最新偏移量相同,则停止消费。
我不是 C#
开发人员,但根据我在 dotnet 融合文档中阅读的内容,您可以调用 QueryWatermarkOffsets
消费者以获得最旧和最新的偏移量。 https://docs.confluent.io/current/clients/confluent-kafka-dotnet/api/Confluent.Kafka.Consumer.html#Confluent_Kafka_Consumer_QueryWatermarkOffsets_Confluent_Kafka_TopicPartition_
然后,在 Message
class 上,您有一个 Offset
访问器。所以整个事情应该不会太难实现。
https://docs.confluent.io/current/clients/confluent-kafka-dotnet/api/Confluent.Kafka.Message.html#Confluent_Kafka_Message_Offset
您可以使用 OnPartitionEOF
事件来指示您已到达分区末尾。
CancellationTokenSource source = new CancellationTokenSource();
bool isContinue = true;
c.OnPartitionEOF += (o, e) =>
{
Console.WriteLine($"You have reached end of partition");
isContinue = false;
source.Cancel();
};
while (isContinue)
{
try
{
var cr = c.Consume(source.Token);
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
我发现 Consumer.IsPartitionEOF 很有用。