如何从特定日期接收来自kafka的数据

How receive data from kafka from specific date

我想订阅主题并接收特定日期的数据。方法调用抛出异常:

Confluent.Kafka.KafkaException: Local: Erroneous state

我的代码:

adminClient = new AdminClientBuilder(_kafkaConfig.AsEnumerable()).Build();
var topicMetadata = adminClient.GetMetadata(_config.Topic, TimeSpan.FromSeconds(2));
var partitions = topicMetadata
    .Topics
    .First(x => x.Topic == _config.Topic)
    .Partitions;
var partitionsOffsets = partitions
    .Select(x => new TopicPartitionTimestamp(_config.Topic, x.PartitionId, new Timestamp(_config.OffsetDateUtc)));

consumer = CreateConsumer();

foreach (var p in partitions)
{
    consumer.Assign(new TopicPartition(_config.Topic, p.PartitionId));
}

var offsets = consumer.OffsetsForTimes(partitionsOffsets, TimeSpan.FromSeconds(2));

//await Task.Delay(1000);

foreach (var o in offsets)
{
    consumer.Seek(o);
}

如果我添加等待:await Task.Delay(1000);。方法 Seek() 不会抛出异常。我如何按日期设置偏移量,没有 Task.Delay?

下一个代码正确!

var adminClient = new AdminClientBuilder(_kafkaConfig.AsEnumerable()).Build();
var topicMetadata = adminClient.GetMetadata(_config.Topic, TimeSpan.FromSeconds(2));
var partitions = topicMetadata
    .Topics
    .First(x => x.Topic == _config.Topic)
    .Partitions;
var partitionsOffsets = partitions
    .Select(x => new TopicPartitionTimestamp(_config.Topic, x.PartitionId, new Timestamp(_config.OffsetDateUtc)));

consumer = CreateConsumer();

var offsets = consumer.OffsetsForTimes(partitionsOffsets, TimeSpan.FromSeconds(2));

foreach (var o in offsets)
{
    consumer.Assign(o); 
}