Kafka 中主题的偏移量是如何工作的 (Kafka_net)
How does the offset for a topic work in Kafka (Kafka_net)
我有一个基本的生产者应用程序和一个消费者应用程序。如果我 运行 都开始关注各自的主题,那么我就有了一个很棒的工作系统。我的想法是,如果我启动生产者并发送一条消息,我就可以启动消费者并让它接收该消息。我错了
除非两者都启动并且 运行ning,否则我会丢失消息(或者它们不会被消耗)。
我的消费者应用看起来像这样用于消费...
Uri uri = new Uri("http://localhost:9092");
KafkaOptions options = new KafkaOptions(uri);
BrokerRouter brokerRouter = new BrokerRouter(options);
Consumer consumer = new Consumer(new ConsumerOptions(receiveTopic, brokerRouter));
List<OffsetResponse> offset = consumer.GetTopicOffsetAsync(receiveTopic, 100000).Result;
IEnumerable<OffsetPosition> t = from x in offset select new OffsetPosition(x.PartitionId, x.Offsets.Max());
consumer.SetOffsetPosition(t.ToArray());
IEnumerable<KafkaNet.Protocol.Message> msgs = consumer.Consume();
foreach (KafkaNet.Protocol.Message msg in msgs)
{
do some stuff here based on the message received
}
除非我有字里行间的代码,否则每次启动应用程序时它都会从头开始。
管理主题偏移以便在断开连接发生后使用消息的正确方法是什么?
如果我运行
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic chat-message-reply-XXX consumer-property fetch-size=40000000 --from-beginning
我可以看到消息,但是当我将我的应用程序连接到该主题时,consumer.Consume() 不会接收它尚未看到的消息。我在使用和不使用 运行 上述 bat 文件的情况下进行了尝试,以查看是否有任何区别。当我查看 consumer.SetOffsetPosition(t.ToArray()) 调用(特别是 t)时,它显示偏移量是该主题的所有消息的计数。
请帮忙,
将 ConsumerOptions
中的 auto.offset.reset
配置设置为 earliest
。当消费者组开始消费消息时,它会从最新的偏移量开始消费,因为auto.offset.reset
的默认值是最新的。
但是我现在看kafka-netAPI,没有AutoOffsetReset
属性,好像其在消费者中的配置相当不足。它还缺少方法摘要文档。
我建议您使用 Confluent .NET Kafka Nuget 包,因为它属于 Confluent 本身。
此外,为什么要调用 GetTopicOffsets
并在消费者中再次设置该偏移量。我认为当你配置你的消费者时,你应该开始使用 Consume()
.
阅读消息
试试这个:
static void Main(string[] args)
{
var uri = new Uri("http://localhost:9092");
var kafkaOptions = new KafkaOptions(uri);
var brokerRouter = new BrokerRouter(kafkaOptions);
var consumerOptions = new ConsumerOptions(receivedTopic, brokerRouter);
var consumer = new Consumer(consumerOptions);
foreach (var msg in consumer.Consume())
{
var value = Encoding.UTF8.GetString(msg.Value);
// Process value here
}
}
此外,在您的 KafkaOptions
和 ConsumerOptions
中启用日志,它们会对您有很大帮助:
var kafkaOptions = new KafkaOptions(uri)
{
Log = new ConsoleLog()
};
var consumerOptions = new ConsumerOptions(topic, brokerRouter)
{
Log = new ConsoleLog()
});
我转而使用 Confluent 的 C# .NET 程序包,现在可以使用了。
我有一个基本的生产者应用程序和一个消费者应用程序。如果我 运行 都开始关注各自的主题,那么我就有了一个很棒的工作系统。我的想法是,如果我启动生产者并发送一条消息,我就可以启动消费者并让它接收该消息。我错了
除非两者都启动并且 运行ning,否则我会丢失消息(或者它们不会被消耗)。
我的消费者应用看起来像这样用于消费...
Uri uri = new Uri("http://localhost:9092");
KafkaOptions options = new KafkaOptions(uri);
BrokerRouter brokerRouter = new BrokerRouter(options);
Consumer consumer = new Consumer(new ConsumerOptions(receiveTopic, brokerRouter));
List<OffsetResponse> offset = consumer.GetTopicOffsetAsync(receiveTopic, 100000).Result;
IEnumerable<OffsetPosition> t = from x in offset select new OffsetPosition(x.PartitionId, x.Offsets.Max());
consumer.SetOffsetPosition(t.ToArray());
IEnumerable<KafkaNet.Protocol.Message> msgs = consumer.Consume();
foreach (KafkaNet.Protocol.Message msg in msgs)
{
do some stuff here based on the message received
}
除非我有字里行间的代码,否则每次启动应用程序时它都会从头开始。 管理主题偏移以便在断开连接发生后使用消息的正确方法是什么?
如果我运行
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic chat-message-reply-XXX consumer-property fetch-size=40000000 --from-beginning
我可以看到消息,但是当我将我的应用程序连接到该主题时,consumer.Consume() 不会接收它尚未看到的消息。我在使用和不使用 运行 上述 bat 文件的情况下进行了尝试,以查看是否有任何区别。当我查看 consumer.SetOffsetPosition(t.ToArray()) 调用(特别是 t)时,它显示偏移量是该主题的所有消息的计数。
请帮忙,
将 ConsumerOptions
中的 auto.offset.reset
配置设置为 earliest
。当消费者组开始消费消息时,它会从最新的偏移量开始消费,因为auto.offset.reset
的默认值是最新的。
但是我现在看kafka-netAPI,没有AutoOffsetReset
属性,好像其在消费者中的配置相当不足。它还缺少方法摘要文档。
我建议您使用 Confluent .NET Kafka Nuget 包,因为它属于 Confluent 本身。
此外,为什么要调用 GetTopicOffsets
并在消费者中再次设置该偏移量。我认为当你配置你的消费者时,你应该开始使用 Consume()
.
试试这个:
static void Main(string[] args)
{
var uri = new Uri("http://localhost:9092");
var kafkaOptions = new KafkaOptions(uri);
var brokerRouter = new BrokerRouter(kafkaOptions);
var consumerOptions = new ConsumerOptions(receivedTopic, brokerRouter);
var consumer = new Consumer(consumerOptions);
foreach (var msg in consumer.Consume())
{
var value = Encoding.UTF8.GetString(msg.Value);
// Process value here
}
}
此外,在您的 KafkaOptions
和 ConsumerOptions
中启用日志,它们会对您有很大帮助:
var kafkaOptions = new KafkaOptions(uri)
{
Log = new ConsoleLog()
};
var consumerOptions = new ConsumerOptions(topic, brokerRouter)
{
Log = new ConsoleLog()
});
我转而使用 Confluent 的 C# .NET 程序包,现在可以使用了。