如何强制对 Kafka 主题进行日志压缩?

How to force log compaction of a Kafka topic?

使用 Kafka 2.7.0(在 K8s 中),我用 cleanup.policy=compact 创建了一个测试主题:

./kafka-topics.sh --create --bootstrap-server kafka.core-kafka.svc.cluster.local:9092 --topic _test_quick_compaction_2021_12_02 --partitions 1 --replication-factor 3 --config cleanup.policy=compact

给它写一些消息:

kafkacat -b kafka.core-kafka.svc.cluster.local:9092 -P -t _test_quick_compaction_2021_12_02 -K:
1:a
2:b
3:c
1:d
2:e

更改主题设置,使压缩在 10 秒后开始:

./kafka-topics.sh --alter --zookeeper zookeeper.core-kafka.svc.cluster.local --topic _test_quick_compaction_2021_12_02 --config max.compaction.lag.ms=10000 --config min.cleanable.dirty.ratio=0.0 --config segment.ms=10000 --config delete.retention.ms=10000

等一下,确定一下:

sleep 60

查看题目内容:

kafkacat -C -e -o beginning -b kafka.core-kafka.svc.cluster.local:9092 -t _test_quick_compaction_2021_12_02 -K:

令我惊讶的是,内容仍然是

1:a
2:b
3:c
1:d
2:e

而不是

3:c
1:d
2:e

如我所料。

为什么题目没有压缩,有什么办法强制压缩?

由于活动段不符合压缩条件,因此诀窍是再次向主题写入内容以强制创建新段。

# Create a test topic.
./kafka-topics.sh --create --bootstrap-server kafka.core-kafka.svc.cluster.local:9092 --topic _test_quick_compaction_2021_12_02 --partitions 1 --replication-factor 3 --config cleanup.policy=compact

# Write some messages to it.
echo "1:a\n2:b\n3:c" | kafkacat -b kafka.core-kafka.svc.cluster.local:9092 -P -t _test_quick_compaction_2021_12_02 -K:

# Check the topic content.
kafkacat -C -e -o beginning -b kafka.core-kafka.svc.cluster.local:9092 -t _test_quick_compaction_2021_12_02 -K:

# Change the topic settings in a way such that compaction should kick in after 10 seconds.
./kafka-topics.sh --alter --zookeeper zookeeper.core-kafka.svc.cluster.local --topic _test_quick_compaction_2021_12_02 --config max.compaction.lag.ms=10000 --config min.cleanable.dirty.ratio=0.0 --config segment.ms=10000 --config delete.retention.ms=10000

# Wait for the last segment to outdate
sleep 11

# Write new messages.
echo "1:d\n2:e" | kafkacat -b kafka.core-kafka.svc.cluster.local:9092 -P -t _test_quick_compaction_2021_12_02 -K:

# Check the topic content.
kafkacat -C -e -o beginning -b kafka.core-kafka.svc.cluster.local:9092 -t _test_quick_compaction_2021_12_02 -K:

# Wait for this segment to outdate.
sleep 11

# Write new messages again.
echo "1:d\n2:e" | kafkacat -b kafka.core-kafka.svc.cluster.local:9092 -P -t _test_quick_compaction_2021_12_02 -K:

# Check the topic content.
kafkacat -C -e -o beginning -b kafka.core-kafka.svc.cluster.local:9092 -t _test_quick_compaction_2021_12_02 -K:

# Wait for compaction to happen.
sleep 11

# Check the topic content to validate that it has been compacted.
kafkacat -C -e -o beginning -b kafka.core-kafka.svc.cluster.local:9092 -t _test_quick_compaction_2021_12_02 -K:

# Revert the setting changes.
./kafka-topics.sh --alter --zookeeper zookeeper.core-kafka.svc.cluster.local --topic _test_quick_compaction_2021_12_02 --delete-config max.compaction.lag.ms --delete-config min.cleanable.dirty.ratio --delete-config segment.ms --delete-config delete.retention.ms

# Delete the topic
# /home/th/kafka_2.13-2.7.0/bin/kafka-topics.sh --delete --bootstrap-server kafka.core-kafka.svc.cluster.local:9092 --topic _test_quick_compaction_2021_12_02