Kafka Connect - 仅在存在时转换重命名字段
Kafka Connect - Transformes rename field only if it exist
我有一个用于多个主题的 S3 接收器连接器(topic_a、topic_b、topic_c)并且 topic_a 有字段 created_date 和 topic_b, topic_c 有 creation_date 。我已经使用下面的 transforms.RenameField.renames
重命名字段 (created_date:creation_date) 但是因为只有 topic_a 有 created_date 而其他人没有,连接器出现故障。
我想使用 creation_date 将所有消息(来自具有单个连接器的所有主题)移动到 s3(如果存在,将 created_date 重命名为 creation_date)但我不能找出用于为特定主题重命名字段(如果存在)的正则表达式或转换器。
"config":{
"connector.class":"io.confluent.connect.s3.S3SinkConnector",
"errors.log.include.messages":"true",
"s3.region":"eu-west-1",
"topics.dir":"dir",
"flush.size":"5",
"tasks.max":"2",
"s3.part.size":"5242880",
"timezone":"UTC",
"locale":"en",
"format.class":"io.confluent.connect.s3.format.json.JsonFormat",
"errors.log.enable":"true",
"s3.bucket.name":"bucket",
"topics": "topic_a, topic_b, topic_c",
"s3.compression.type":"gzip",
"partitioner.class":"io.confluent.connect.storage.partitioner.DailyPartitioner",
"name":"NAME",
"storage.class":"io.confluent.connect.s3.storage.S3Storage",
"key.converter.schemas.enable":"true",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter.schemas.enable":"true",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"https://schemaregistry.com",
"enhanced.avro.schema.support": "true",
"transforms": "RenameField",
"transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.RenameField.renames": "created_date:creation_date"
}
only topic_a have created_date and others don't,
然后您将使用单独的连接器。一个带有转换,所有主题都带有字段,另一个没有转换。
from all topics with single connector
这不能很好地扩展。您正在制作有限的消费者线程和一个消费者组来同时阅读许多主题。多个连接器会更好地分配负载。
我有一个用于多个主题的 S3 接收器连接器(topic_a、topic_b、topic_c)并且 topic_a 有字段 created_date 和 topic_b, topic_c 有 creation_date 。我已经使用下面的 transforms.RenameField.renames
重命名字段 (created_date:creation_date) 但是因为只有 topic_a 有 created_date 而其他人没有,连接器出现故障。
我想使用 creation_date 将所有消息(来自具有单个连接器的所有主题)移动到 s3(如果存在,将 created_date 重命名为 creation_date)但我不能找出用于为特定主题重命名字段(如果存在)的正则表达式或转换器。
"config":{
"connector.class":"io.confluent.connect.s3.S3SinkConnector",
"errors.log.include.messages":"true",
"s3.region":"eu-west-1",
"topics.dir":"dir",
"flush.size":"5",
"tasks.max":"2",
"s3.part.size":"5242880",
"timezone":"UTC",
"locale":"en",
"format.class":"io.confluent.connect.s3.format.json.JsonFormat",
"errors.log.enable":"true",
"s3.bucket.name":"bucket",
"topics": "topic_a, topic_b, topic_c",
"s3.compression.type":"gzip",
"partitioner.class":"io.confluent.connect.storage.partitioner.DailyPartitioner",
"name":"NAME",
"storage.class":"io.confluent.connect.s3.storage.S3Storage",
"key.converter.schemas.enable":"true",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter.schemas.enable":"true",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"https://schemaregistry.com",
"enhanced.avro.schema.support": "true",
"transforms": "RenameField",
"transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.RenameField.renames": "created_date:creation_date"
}
only topic_a have created_date and others don't,
然后您将使用单独的连接器。一个带有转换,所有主题都带有字段,另一个没有转换。
from all topics with single connector
这不能很好地扩展。您正在制作有限的消费者线程和一个消费者组来同时阅读许多主题。多个连接器会更好地分配负载。