当我使用相同的密钥时,Kafka JDBC 连接不将消息发布到一个分区
when i use same Key, Kafka JDBC connect not publishing the messages to one Partition
具有相同 Key 的消息应该发送到主题的相同分区,但是 Kafka JDBC 源连接器将消息发布到不同的分区。
我创建了一个包含 5 个分区的 TOPIC(student-topic-in)。
我使用以下脚本创建了一个学生 table:
create TABLE student (
std_id INT AUTO_INCREMENT PRIMARY KEY,
std_name VARCHAR(50),
class_name VARCHAR(50),
father_name VARCHAR(50),
mother_name VARCHAR(50),
school VARCHAR(50)
);
我的 JDBC source-quickstart 属性文件如下
query= select * from student
task.max=1
mode=incrementing
incrementing.column.name=std_id
topic.prefix=student-topic-in
numeric.mapping=best_fit
timestamp.delay.interval.ms =10
transforms=CreateKey,ExtractKey,ConvertDate,Replace,InsertPartition,InsertTopic
transforms.CreateKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.CreateKey.fields=class_name
transforms.ExtractKey.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.ExtractKey.field=class_name
当我在数据库 table 中插入相同的 class 学生详细信息时,所有消息都发布到一个分区。
student-topic-in 3 "15" @ 35: {"std_id":145,"std_name":"pranavi311","class_name":"15","father_name":"abcd1","mother_name":"efgh1","school_name":"CSI","partition":null,"topic":"student-topic-in"}
student-topic-in 3 "15" @ 36: {"std_id":146,"std_name":"pranavi321","class_name":"15","father_name":"abcd2","mother_name":"efgh2","school_name":"CSI","partition":null,"topic":"student-topic-in"}
student-topic-in 3 "15" @ 37: {"std_id":147,"std_name":"pranavi331","class_name":"15","father_name":"abcd3","mother_name":"efgh3","school_name":"CSI","partition":null,"topic":"student-topic-in"}
student-topic-in 3 "15" @ 38: {"std_id":148,"std_name":"pranavi341","class_name":"15","father_name":"abcd4","mother_name":"efgh4","school_name":"CSI","partition":null,"topic":"student-topic-in"}
student-topic-in 3 "15" @ 39: {"std_id":149,"std_name":"pranavi351","class_name":"15","father_name":"abcd5","mother_name":"efgh5","school_name":"CSI","partition":null,"topic":"student-topic-in"}
student-topic-in 3 "15" @ 40: {"std_id":150,"std_name":"pranavi361","class_name":"15","father_name":"abcd6","mother_name":"efgh6","school_name":"CSI","partition":null,"topic":"student-topic-in"}
% 在偏移量 41
处到达主题 student-topic-in [3] 的结尾
但是,如果我插入不同的 class 学生详细信息,它仍然发布到一个分区。
student-topic-in 3 "11" @ 41: {"std_id":151,"std_name":"pranavi311","class_name":"11","father_name":"abcd1","mother_name":"efgh1","school_name":"CSI","partition":null,"topic":"student-topic-in"}
student-topic-in 3 "12" @ 42: {"std_id":152,"std_name":"pranavi321","class_name":"12","father_name":"abcd2","mother_name":"efgh2","school_name":"CSI","partition":null,"topic":"student-topic-in"}
student-topic-in 3 "13" @ 43: {"std_id":153,"std_name":"pranavi331","class_name":"13","father_name":"abcd3","mother_name":"efgh3","school_name":"CSI","partition":null,"topic":"student-topic-in"}
student-topic-in 3 "14" @ 44: {"std_id":154,"std_name":"pranavi341","class_name":"14","father_name":"abcd4","mother_name":"efgh4","school_name":"CSI","partition":null,"topic":"student-topic-in"}
student-topic-in 3 "15" @ 45: {"std_id":155,"std_name":"pranavi351","class_name":"15","father_name":"abcd5","mother_name":"efgh5","school_name":"CSI","partition":null,"topic":"student-topic-in"}
student-topic-in 0 "16" @ 31: {"std_id":156,"std_name":"pranavi361","class_name":"16","father_name":"abcd6","mother_name":"efgh6","school_name":"CSI","partition":null,"topic":"student-topic-in"}
% 在偏移量 46 处到达主题 student-topic-in [3] 的结尾
我正在使用以下命令打印详细信息。
kafkacat -b localhost:9092 -C -t student-topic-in -f '%t %p %k @ %o: %s\n'
我的期望是,每个 class 学生的消息都应该发布到一个特定的分区(在 JDBC 连接器中,我将 Class_name 指定为键)但它不起作用。
我到底错过了什么?如何将每个 class 学生发布到特定分区?
我使用字符串转换器解决了这个问题key.converter=org.apache.kafka.connect.storage.StringConverter
在你的情况下一切正常。
如果您查看 Kafka Connect 源代码,您可以在 WorkerSourceTask::sendRecords
方法中看到,在 Producer 发送之前对每条记录应用转换,然后消息通过 [=13= 转换为字节数组]
private boolean sendRecords() {
...
final SourceRecord record = transformationChain.apply(preTransformRecord);
final ProducerRecord<byte[], byte[]> producerRecord = convertTransformedRecord(record);
...
}
在您的情况下,转换是:CreateKey,ExtractKey,ConvertDate,Replace,InsertPartition,InsertTopic
,转换器是 org.apache.kafka.connect.json.JsonConverter
转换器将您的密钥与架构映射到字节数组,然后发送到 Kafka。
@Override
public byte[] fromConnectData(String topic, Schema schema, Object value) {
JsonNode jsonValue = enableSchemas ? convertToJsonWithEnvelope(schema, value) : convertToJsonWithoutEnvelope(schema, value);
try {
return serializer.serialize(topic, jsonValue);
} catch (SerializationException e) {
throw new DataException("Converting Kafka Connect data to byte[] failed due to serialization error: ", e);
}
}
您已禁用该架构,因此调用后的密钥结果为:
- 11
serializer.serialize(topic,new TextNode("11"))
= [34,49,49,34]
- 12
serializer.serialize(topic,new TextNode("12"))
= [34,49,50,34]
- 13
serializer.serialize(topic,new TextNode("13"))
= [34,49,51,34]
- 14
serializer.serialize(topic,new TextNode("14"))
= [34,49,52,34]
- 15
serializer.serialize(topic,new TextNode("15"))
= [34,49,53,34]
- 16
serializer.serialize(topic,new TextNode("16"))
= [34,49,54,34]
每条消息都由 Producer
发送到某个分区。
将向哪个分区消息发送取决于 Partitioner
(org.apache.kafka.clients.producer.Partitioner
)。 Kafka Connect 使用默认值 - org.apache.kafka.clients.producer.internals.DefaultPartitioner
在幕后 DefaultPartitioner
使用以下函数计算分区:org.apache.kafka.common.utils.Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
如果你应用到你的参数(5 个分区和你的键的字节数组),你将得到如下:
Utils.toPositive(Utils.murmur2(new byte[]{34,49,49,34})) % 5
= 3
Utils.toPositive(Utils.murmur2(new byte[]{34,49,50,34})) % 5
= 3
Utils.toPositive(Utils.murmur2(new byte[]{34,49,51,34})) % 5
= 3
Utils.toPositive(Utils.murmur2(new byte[]{34,49,52,34})) % 5
= 3
Utils.toPositive(Utils.murmur2(new byte[]{34,49,53,34})) % 5
= 3
Utils.toPositive(Utils.murmur2(new byte[]{34,49,54,34})) % 5
= 0
希望能或多或少地解释是什么以及为什么
具有相同 Key 的消息应该发送到主题的相同分区,但是 Kafka JDBC 源连接器将消息发布到不同的分区。
我创建了一个包含 5 个分区的 TOPIC(student-topic-in)。
我使用以下脚本创建了一个学生 table:
create TABLE student (
std_id INT AUTO_INCREMENT PRIMARY KEY,
std_name VARCHAR(50),
class_name VARCHAR(50),
father_name VARCHAR(50),
mother_name VARCHAR(50),
school VARCHAR(50)
);
我的 JDBC source-quickstart 属性文件如下
query= select * from student
task.max=1
mode=incrementing
incrementing.column.name=std_id
topic.prefix=student-topic-in
numeric.mapping=best_fit
timestamp.delay.interval.ms =10
transforms=CreateKey,ExtractKey,ConvertDate,Replace,InsertPartition,InsertTopic
transforms.CreateKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.CreateKey.fields=class_name
transforms.ExtractKey.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.ExtractKey.field=class_name
当我在数据库 table 中插入相同的 class 学生详细信息时,所有消息都发布到一个分区。
student-topic-in 3 "15" @ 35: {"std_id":145,"std_name":"pranavi311","class_name":"15","father_name":"abcd1","mother_name":"efgh1","school_name":"CSI","partition":null,"topic":"student-topic-in"}
student-topic-in 3 "15" @ 36: {"std_id":146,"std_name":"pranavi321","class_name":"15","father_name":"abcd2","mother_name":"efgh2","school_name":"CSI","partition":null,"topic":"student-topic-in"}
student-topic-in 3 "15" @ 37: {"std_id":147,"std_name":"pranavi331","class_name":"15","father_name":"abcd3","mother_name":"efgh3","school_name":"CSI","partition":null,"topic":"student-topic-in"}
student-topic-in 3 "15" @ 38: {"std_id":148,"std_name":"pranavi341","class_name":"15","father_name":"abcd4","mother_name":"efgh4","school_name":"CSI","partition":null,"topic":"student-topic-in"}
student-topic-in 3 "15" @ 39: {"std_id":149,"std_name":"pranavi351","class_name":"15","father_name":"abcd5","mother_name":"efgh5","school_name":"CSI","partition":null,"topic":"student-topic-in"}
student-topic-in 3 "15" @ 40: {"std_id":150,"std_name":"pranavi361","class_name":"15","father_name":"abcd6","mother_name":"efgh6","school_name":"CSI","partition":null,"topic":"student-topic-in"}
% 在偏移量 41
处到达主题 student-topic-in [3] 的结尾但是,如果我插入不同的 class 学生详细信息,它仍然发布到一个分区。
student-topic-in 3 "11" @ 41: {"std_id":151,"std_name":"pranavi311","class_name":"11","father_name":"abcd1","mother_name":"efgh1","school_name":"CSI","partition":null,"topic":"student-topic-in"}
student-topic-in 3 "12" @ 42: {"std_id":152,"std_name":"pranavi321","class_name":"12","father_name":"abcd2","mother_name":"efgh2","school_name":"CSI","partition":null,"topic":"student-topic-in"}
student-topic-in 3 "13" @ 43: {"std_id":153,"std_name":"pranavi331","class_name":"13","father_name":"abcd3","mother_name":"efgh3","school_name":"CSI","partition":null,"topic":"student-topic-in"}
student-topic-in 3 "14" @ 44: {"std_id":154,"std_name":"pranavi341","class_name":"14","father_name":"abcd4","mother_name":"efgh4","school_name":"CSI","partition":null,"topic":"student-topic-in"}
student-topic-in 3 "15" @ 45: {"std_id":155,"std_name":"pranavi351","class_name":"15","father_name":"abcd5","mother_name":"efgh5","school_name":"CSI","partition":null,"topic":"student-topic-in"}
student-topic-in 0 "16" @ 31: {"std_id":156,"std_name":"pranavi361","class_name":"16","father_name":"abcd6","mother_name":"efgh6","school_name":"CSI","partition":null,"topic":"student-topic-in"}
% 在偏移量 46 处到达主题 student-topic-in [3] 的结尾
我正在使用以下命令打印详细信息。
kafkacat -b localhost:9092 -C -t student-topic-in -f '%t %p %k @ %o: %s\n'
我的期望是,每个 class 学生的消息都应该发布到一个特定的分区(在 JDBC 连接器中,我将 Class_name 指定为键)但它不起作用。
我到底错过了什么?如何将每个 class 学生发布到特定分区?
我使用字符串转换器解决了这个问题key.converter=org.apache.kafka.connect.storage.StringConverter
在你的情况下一切正常。
如果您查看 Kafka Connect 源代码,您可以在 WorkerSourceTask::sendRecords
方法中看到,在 Producer 发送之前对每条记录应用转换,然后消息通过 [=13= 转换为字节数组]
private boolean sendRecords() {
...
final SourceRecord record = transformationChain.apply(preTransformRecord);
final ProducerRecord<byte[], byte[]> producerRecord = convertTransformedRecord(record);
...
}
在您的情况下,转换是:CreateKey,ExtractKey,ConvertDate,Replace,InsertPartition,InsertTopic
,转换器是 org.apache.kafka.connect.json.JsonConverter
转换器将您的密钥与架构映射到字节数组,然后发送到 Kafka。
@Override
public byte[] fromConnectData(String topic, Schema schema, Object value) {
JsonNode jsonValue = enableSchemas ? convertToJsonWithEnvelope(schema, value) : convertToJsonWithoutEnvelope(schema, value);
try {
return serializer.serialize(topic, jsonValue);
} catch (SerializationException e) {
throw new DataException("Converting Kafka Connect data to byte[] failed due to serialization error: ", e);
}
}
您已禁用该架构,因此调用后的密钥结果为:
- 11
serializer.serialize(topic,new TextNode("11"))
= [34,49,49,34] - 12
serializer.serialize(topic,new TextNode("12"))
= [34,49,50,34] - 13
serializer.serialize(topic,new TextNode("13"))
= [34,49,51,34] - 14
serializer.serialize(topic,new TextNode("14"))
= [34,49,52,34] - 15
serializer.serialize(topic,new TextNode("15"))
= [34,49,53,34] - 16
serializer.serialize(topic,new TextNode("16"))
= [34,49,54,34]
每条消息都由 Producer
发送到某个分区。
将向哪个分区消息发送取决于 Partitioner
(org.apache.kafka.clients.producer.Partitioner
)。 Kafka Connect 使用默认值 - org.apache.kafka.clients.producer.internals.DefaultPartitioner
在幕后 DefaultPartitioner
使用以下函数计算分区:org.apache.kafka.common.utils.Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
如果你应用到你的参数(5 个分区和你的键的字节数组),你将得到如下:
Utils.toPositive(Utils.murmur2(new byte[]{34,49,49,34})) % 5
= 3Utils.toPositive(Utils.murmur2(new byte[]{34,49,50,34})) % 5
= 3Utils.toPositive(Utils.murmur2(new byte[]{34,49,51,34})) % 5
= 3Utils.toPositive(Utils.murmur2(new byte[]{34,49,52,34})) % 5
= 3Utils.toPositive(Utils.murmur2(new byte[]{34,49,53,34})) % 5
= 3Utils.toPositive(Utils.murmur2(new byte[]{34,49,54,34})) % 5
= 0
希望能或多或少地解释是什么以及为什么