Kafka JDBC 连接器中的自定义分区分配
Custom partition assignment in Kafka JDBC connector
我有一个用例,我需要编写自定义逻辑以根据消息中的某些关键参数分配分区。我对此做了一些研究,发现 kafka 转换支持覆盖转换接口中的某些方法,但我无法在 git 集线器或其他地方执行一些示例代码。有人可以分享示例代码或 git hub link 在 kafka JDBC 源连接器中进行自定义分区分配吗?
提前致谢!
Kafka Connect默认分配分区使用:DefaultPartitioner
(org.apache.kafka.clients.producer.internals.DefaultPartitioner
)
如果您需要用一些自定义覆盖默认连接器,这是可能的,但您必须记住,覆盖适用于所有源连接器。
为此,您必须设置 producer.partitioner.class
属性,例如 producer.partitioner.class=com.example.CustomPartitioner
。
此外,您必须使用分区程序将 jar 复制到包含 Kafka Connect 库的目录。
变换方式:
Transformation也可以设置分区,但不是正确的做法。
从 Transformation
开始,您无权访问主题元数据,这对于分配分区至关重要。
如果您想为记录设置分区,代码应如下所示:
public class AddPartition <R extends ConnectRecord<R>> implements Transformation<R> {
public static final ConfigDef CONFIG_DEF = new ConfigDef();
@Override
public void configure(Map<String, ?> props) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
}
@Override
public R apply(R record) {
return record.newRecord(record.topic(), calculatePartition(record), record.keySchema(), record.key(), record.valueSchema(), record.value(), record.timestamp());
}
private Integer calculatePartition(R record) {
// Partitions calcuation based on record information
return 0;
}
@Override
public void close() {
}
@Override
public ConfigDef config() {
return CONFIG_DEF;
}
}
我有一个用例,我需要编写自定义逻辑以根据消息中的某些关键参数分配分区。我对此做了一些研究,发现 kafka 转换支持覆盖转换接口中的某些方法,但我无法在 git 集线器或其他地方执行一些示例代码。有人可以分享示例代码或 git hub link 在 kafka JDBC 源连接器中进行自定义分区分配吗?
提前致谢!
Kafka Connect默认分配分区使用:DefaultPartitioner
(org.apache.kafka.clients.producer.internals.DefaultPartitioner
)
如果您需要用一些自定义覆盖默认连接器,这是可能的,但您必须记住,覆盖适用于所有源连接器。
为此,您必须设置 producer.partitioner.class
属性,例如 producer.partitioner.class=com.example.CustomPartitioner
。
此外,您必须使用分区程序将 jar 复制到包含 Kafka Connect 库的目录。
变换方式:
Transformation也可以设置分区,但不是正确的做法。
从 Transformation
开始,您无权访问主题元数据,这对于分配分区至关重要。
如果您想为记录设置分区,代码应如下所示:
public class AddPartition <R extends ConnectRecord<R>> implements Transformation<R> {
public static final ConfigDef CONFIG_DEF = new ConfigDef();
@Override
public void configure(Map<String, ?> props) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
}
@Override
public R apply(R record) {
return record.newRecord(record.topic(), calculatePartition(record), record.keySchema(), record.key(), record.valueSchema(), record.value(), record.timestamp());
}
private Integer calculatePartition(R record) {
// Partitions calcuation based on record information
return 0;
}
@Override
public void close() {
}
@Override
public ConfigDef config() {
return CONFIG_DEF;
}
}