使用 SCS 删除消费消息的 kafka 日志

delete kafka logs for consumed messages, using SCS

我是使用 kafka 和 spring 云流的新手。需要一些帮助。

设置


用例

问题

在 Kafka 中,消费的责任是消费者的责任。所以我想,spring 云流 kafka 中一定有一些我不知道的 kafka 消息日志控制机制。


注意 1:我知道 kafka 日志保留时间和磁盘属性。但是kafka日志即使对于未消费的消息也会被删除。

注意 2:我已经完成了 但它无济于事。

据我所知,在 Kafka 中没有这样的机制;当然不在 Spring Cloud Stream 或它所基于的库中。 Kafka 客户端无法访问此类低级构造。

此外,消费者偏移量与主题日志完全分开;在现代经纪人中,它们存储在一个特殊的主题中。

编辑

根据下面的评论,可以使用 kafka-delete-records.sh 命令行工具。

请注意,这使用默认情况下不在 SCSt 类路径上的 scala AdminClient(自 2.0 起)。

但是,java AdminClient 支持类似的功能:

/**
 * Delete records whose offset is smaller than the given offset of the corresponding partition.
 *
 * This is a convenience method for {@link #deleteRecords(Map, DeleteRecordsOptions)} with default options.
 * See the overload for more details.
 *
 * This operation is supported by brokers with version 0.11.0.0 or higher.
 *
 * @param recordsToDelete       The topic partitions and related offsets from which records deletion starts.
 * @return                      The DeleteRecordsResult.
 */
public DeleteRecordsResult deleteRecords(Map<TopicPartition, RecordsToDelete> recordsToDelete) {
    return deleteRecords(recordsToDelete, new DeleteRecordsOptions());
}

您可以使用引导程序的 AutoConfiguration KafkaAdmin 创建一个 AdminClient

AdminClient client = AdminClient.create(kafkaAdmin.getConfig());