如何从特定日期接收来自kafka的数据
How receive data from kafka from specific date
我想订阅主题并接收特定日期的数据。方法调用抛出异常:
Confluent.Kafka.KafkaException: Local: Erroneous state
我的代码:
adminClient = new AdminClientBuilder(_kafkaConfig.AsEnumerable()).Build();
var topicMetadata = adminClient.GetMetadata(_config.Topic, TimeSpan.FromSeconds(2));
var partitions = topicMetadata
.Topics
.First(x => x.Topic == _config.Topic)
.Partitions;
var partitionsOffsets = partitions
.Select(x => new TopicPartitionTimestamp(_config.Topic, x.PartitionId, new Timestamp(_config.OffsetDateUtc)));
consumer = CreateConsumer();
foreach (var p in partitions)
{
consumer.Assign(new TopicPartition(_config.Topic, p.PartitionId));
}
var offsets = consumer.OffsetsForTimes(partitionsOffsets, TimeSpan.FromSeconds(2));
//await Task.Delay(1000);
foreach (var o in offsets)
{
consumer.Seek(o);
}
如果我添加等待:await Task.Delay(1000);
。方法 Seek()
不会抛出异常。我如何按日期设置偏移量,没有 Task.Delay
?
下一个代码正确!
var adminClient = new AdminClientBuilder(_kafkaConfig.AsEnumerable()).Build();
var topicMetadata = adminClient.GetMetadata(_config.Topic, TimeSpan.FromSeconds(2));
var partitions = topicMetadata
.Topics
.First(x => x.Topic == _config.Topic)
.Partitions;
var partitionsOffsets = partitions
.Select(x => new TopicPartitionTimestamp(_config.Topic, x.PartitionId, new Timestamp(_config.OffsetDateUtc)));
consumer = CreateConsumer();
var offsets = consumer.OffsetsForTimes(partitionsOffsets, TimeSpan.FromSeconds(2));
foreach (var o in offsets)
{
consumer.Assign(o);
}
我想订阅主题并接收特定日期的数据。方法调用抛出异常:
Confluent.Kafka.KafkaException: Local: Erroneous state
我的代码:
adminClient = new AdminClientBuilder(_kafkaConfig.AsEnumerable()).Build();
var topicMetadata = adminClient.GetMetadata(_config.Topic, TimeSpan.FromSeconds(2));
var partitions = topicMetadata
.Topics
.First(x => x.Topic == _config.Topic)
.Partitions;
var partitionsOffsets = partitions
.Select(x => new TopicPartitionTimestamp(_config.Topic, x.PartitionId, new Timestamp(_config.OffsetDateUtc)));
consumer = CreateConsumer();
foreach (var p in partitions)
{
consumer.Assign(new TopicPartition(_config.Topic, p.PartitionId));
}
var offsets = consumer.OffsetsForTimes(partitionsOffsets, TimeSpan.FromSeconds(2));
//await Task.Delay(1000);
foreach (var o in offsets)
{
consumer.Seek(o);
}
如果我添加等待:await Task.Delay(1000);
。方法 Seek()
不会抛出异常。我如何按日期设置偏移量,没有 Task.Delay
?
下一个代码正确!
var adminClient = new AdminClientBuilder(_kafkaConfig.AsEnumerable()).Build();
var topicMetadata = adminClient.GetMetadata(_config.Topic, TimeSpan.FromSeconds(2));
var partitions = topicMetadata
.Topics
.First(x => x.Topic == _config.Topic)
.Partitions;
var partitionsOffsets = partitions
.Select(x => new TopicPartitionTimestamp(_config.Topic, x.PartitionId, new Timestamp(_config.OffsetDateUtc)));
consumer = CreateConsumer();
var offsets = consumer.OffsetsForTimes(partitionsOffsets, TimeSpan.FromSeconds(2));
foreach (var o in offsets)
{
consumer.Assign(o);
}