使用 SCS 删除消费消息的 kafka 日志
delete kafka logs for consumed messages, using SCS
我是使用 kafka 和 spring 云流的新手。需要一些帮助。
设置
- 我有两个 spring-引导应用程序 App-1、App-2。
- 我正在使用 spring 云流 和 spring-cloud-stream-binder-kafka异步通信。
- 有一个主题TOPIC-1
用例
- 假设 App-1 发送了一条关于主题 TOPIC-1 的消息,App-2 正在收听该消息。
- App-2 使用了消息并成功处理了它。
- 现在该主题的偏移量增加了。
问题
- 如何实现一种机制,在指定时间段后从 kafka 日志中删除唯一成功消费的消息数据?
在 Kafka 中,消费的责任是消费者的责任。所以我想,spring 云流 kafka 中一定有一些我不知道的 kafka 消息日志控制机制。
注意 1:我知道 kafka 日志保留时间和磁盘属性。但是kafka日志即使对于未消费的消息也会被删除。
注意 2:我已经完成了 但它无济于事。
据我所知,在 Kafka 中没有这样的机制;当然不在 Spring Cloud Stream 或它所基于的库中。 Kafka 客户端无法访问此类低级构造。
此外,消费者偏移量与主题日志完全分开;在现代经纪人中,它们存储在一个特殊的主题中。
编辑
根据下面的评论,可以使用 kafka-delete-records.sh
命令行工具。
请注意,这使用默认情况下不在 SCSt 类路径上的 scala AdminClient
(自 2.0 起)。
但是,java AdminClient
支持类似的功能:
/**
* Delete records whose offset is smaller than the given offset of the corresponding partition.
*
* This is a convenience method for {@link #deleteRecords(Map, DeleteRecordsOptions)} with default options.
* See the overload for more details.
*
* This operation is supported by brokers with version 0.11.0.0 or higher.
*
* @param recordsToDelete The topic partitions and related offsets from which records deletion starts.
* @return The DeleteRecordsResult.
*/
public DeleteRecordsResult deleteRecords(Map<TopicPartition, RecordsToDelete> recordsToDelete) {
return deleteRecords(recordsToDelete, new DeleteRecordsOptions());
}
您可以使用引导程序的 AutoConfiguration
KafkaAdmin
创建一个 AdminClient
。
AdminClient client = AdminClient.create(kafkaAdmin.getConfig());
我是使用 kafka 和 spring 云流的新手。需要一些帮助。
设置
- 我有两个 spring-引导应用程序 App-1、App-2。
- 我正在使用 spring 云流 和 spring-cloud-stream-binder-kafka异步通信。
- 有一个主题TOPIC-1
用例
- 假设 App-1 发送了一条关于主题 TOPIC-1 的消息,App-2 正在收听该消息。
- App-2 使用了消息并成功处理了它。
- 现在该主题的偏移量增加了。
问题
- 如何实现一种机制,在指定时间段后从 kafka 日志中删除唯一成功消费的消息数据?
在 Kafka 中,消费的责任是消费者的责任。所以我想,spring 云流 kafka 中一定有一些我不知道的 kafka 消息日志控制机制。
注意 1:我知道 kafka 日志保留时间和磁盘属性。但是kafka日志即使对于未消费的消息也会被删除。
注意 2:我已经完成了
据我所知,在 Kafka 中没有这样的机制;当然不在 Spring Cloud Stream 或它所基于的库中。 Kafka 客户端无法访问此类低级构造。
此外,消费者偏移量与主题日志完全分开;在现代经纪人中,它们存储在一个特殊的主题中。
编辑
根据下面的评论,可以使用 kafka-delete-records.sh
命令行工具。
请注意,这使用默认情况下不在 SCSt 类路径上的 scala AdminClient
(自 2.0 起)。
但是,java AdminClient
支持类似的功能:
/**
* Delete records whose offset is smaller than the given offset of the corresponding partition.
*
* This is a convenience method for {@link #deleteRecords(Map, DeleteRecordsOptions)} with default options.
* See the overload for more details.
*
* This operation is supported by brokers with version 0.11.0.0 or higher.
*
* @param recordsToDelete The topic partitions and related offsets from which records deletion starts.
* @return The DeleteRecordsResult.
*/
public DeleteRecordsResult deleteRecords(Map<TopicPartition, RecordsToDelete> recordsToDelete) {
return deleteRecords(recordsToDelete, new DeleteRecordsOptions());
}
您可以使用引导程序的 AutoConfiguration
KafkaAdmin
创建一个 AdminClient
。
AdminClient client = AdminClient.create(kafkaAdmin.getConfig());