我可以让 Kafka consumers/sink 连接以跳过主题中的特定分区吗?
Can I have Kafka consumers/sink connects to skip specific partitions within a topic?
Kafka Connect 中的任何选项,用于指定专门从哪个分区读取消息。基本上,我正在 Kafka Connects 中寻找一个选项来手动分配要读取的分区列表。
类似于KafkaConsumer中的assign()方法API
您不能只监听 Kafka Connect 中的特定分区。
但是您可以实现仅从特定分区插入消息的功能。
要拥有这样的功能,您需要实现您的自定义 Transformation
。
如果 Transformation
returns null
消息被跳过,那么您的自定义 Transformation
必须 return null
不需要的分区。
示例代码如下:
public class PartitionFilter <R extends ConnectRecord<R>> implements Transformation<R> {
public static final ConfigDef CONFIG_DEF = new ConfigDef();
@Override
public void configure(Map<String, ?> props) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
}
@Override
public R apply(R record) {
int neededPartition = 1; // some parititon
if (record.kafkaPartition() != neededPartition)
return null;
return record;
}
@Override
public void close() {
}
@Override
public ConfigDef config() {
return CONFIG_DEF;
}
}
可以找到有关转换的更多信息:https://kafka.apache.org/documentation/#connect_transforms
Kafka Connect 中的任何选项,用于指定专门从哪个分区读取消息。基本上,我正在 Kafka Connects 中寻找一个选项来手动分配要读取的分区列表。
类似于KafkaConsumer中的assign()方法API
您不能只监听 Kafka Connect 中的特定分区。
但是您可以实现仅从特定分区插入消息的功能。
要拥有这样的功能,您需要实现您的自定义 Transformation
。
如果 Transformation
returns null
消息被跳过,那么您的自定义 Transformation
必须 return null
不需要的分区。
示例代码如下:
public class PartitionFilter <R extends ConnectRecord<R>> implements Transformation<R> {
public static final ConfigDef CONFIG_DEF = new ConfigDef();
@Override
public void configure(Map<String, ?> props) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
}
@Override
public R apply(R record) {
int neededPartition = 1; // some parititon
if (record.kafkaPartition() != neededPartition)
return null;
return record;
}
@Override
public void close() {
}
@Override
public ConfigDef config() {
return CONFIG_DEF;
}
}
可以找到有关转换的更多信息:https://kafka.apache.org/documentation/#connect_transforms