Kafka 中主题的偏移量是如何工作的 (Kafka_net)

How does the offset for a topic work in Kafka (Kafka_net)

我有一个基本的生产者应用程序和一个消费者应用程序。如果我 运行 都开始关注各自的主题,那么我就有了一个很棒的工作系统。我的想法是,如果我启动生产者并发送一条消息,我就可以启动消费者并让它接收该消息。我错了

除非两者都启动并且 运行ning,否则我会丢失消息(或者它们不会被消耗)。

我的消费者应用看起来像这样用于消费...

Uri uri = new Uri("http://localhost:9092");

KafkaOptions options = new KafkaOptions(uri);                
BrokerRouter brokerRouter = new BrokerRouter(options);
Consumer consumer = new Consumer(new ConsumerOptions(receiveTopic, brokerRouter));
             

List<OffsetResponse> offset = consumer.GetTopicOffsetAsync(receiveTopic, 100000).Result;

IEnumerable<OffsetPosition> t = from x in offset select new OffsetPosition(x.PartitionId, x.Offsets.Max());
consumer.SetOffsetPosition(t.ToArray());
IEnumerable<KafkaNet.Protocol.Message> msgs = consumer.Consume();

foreach (KafkaNet.Protocol.Message msg in msgs)
{
    do some stuff here based on the message received
}

除非我有字里行间的代码,否则每次启动应用程序时它都会从头开始。 管理主题偏移以便在断开连接发生后使用消息的正确方法是什么?

如果我运行

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic chat-message-reply-XXX consumer-property fetch-size=40000000 --from-beginning 

我可以看到消息,但是当我将我的应用程序连接到该主题时,consumer.Consume() 不会接收它尚未看到的消息。我在使用和不使用 运行 上述 bat 文件的情况下进行了尝试,以查看是否有任何区别。当我查看 consumer.SetOffsetPosition(t.ToArray()) 调用(特别是 t)时,它显示偏移量是该主题的所有消息的计数。

请帮忙,

ConsumerOptions 中的 auto.offset.reset 配置设置为 earliest。当消费者组开始消费消息时,它会从最新的偏移量开始消费,因为auto.offset.reset的默认值是最新的。

但是我现在看kafka-netAPI,没有AutoOffsetReset属性,好像其在消费者中的配置相当不足。它还缺少方法摘要文档。

我建议您使用 Confluent .NET Kafka Nuget 包,因为它属于 Confluent 本身。

此外,为什么要调用 GetTopicOffsets 并在消费者中再次设置该偏移量。我认为当你配置你的消费者时,你应该开始使用 Consume().

阅读消息

试试这个:

static void Main(string[] args)
{
    var uri = new Uri("http://localhost:9092");

    var kafkaOptions = new KafkaOptions(uri);                
    var brokerRouter = new BrokerRouter(kafkaOptions);
    var consumerOptions = new ConsumerOptions(receivedTopic, brokerRouter);
    var consumer = new Consumer(consumerOptions);

    foreach (var msg in consumer.Consume())
    {
        var value = Encoding.UTF8.GetString(msg.Value);

        // Process value here
    }
}

此外,在您的 KafkaOptionsConsumerOptions 中启用日志,它们会对您有很大帮助:

var kafkaOptions = new KafkaOptions(uri)
            {
                Log = new ConsoleLog()
            };
var consumerOptions = new ConsumerOptions(topic, brokerRouter)
            {
                Log = new ConsoleLog()
            });

我转而使用 Confluent 的 C# .NET 程序包,现在可以使用了。