Kafka Connect - 仅在存在时转换重命名字段

Kafka Connect - Transformes rename field only if it exist

我有一个用于多个主题的 S3 接收器连接器(topic_a、topic_b、topic_c)并且 topic_a 有字段 created_date 和 topic_b, topic_c 有 creation_date 。我已经使用下面的 transforms.RenameField.renames 重命名字段 (created_date:creation_date) 但是因为只有 topic_a 有 created_date 而其他人没有,连接器出现故障。

我想使用 creation_date 将所有消息(来自具有单个连接器的所有主题)移动到 s3(如果存在,将 created_date 重命名为 creation_date)但我不能找出用于为特定主题重命名字段(如果存在)的正则表达式或转换器。

   "config":{
      "connector.class":"io.confluent.connect.s3.S3SinkConnector",
      "errors.log.include.messages":"true",
      "s3.region":"eu-west-1",
      "topics.dir":"dir",
      "flush.size":"5",
      "tasks.max":"2",
      "s3.part.size":"5242880",
      "timezone":"UTC",
      "locale":"en",
      "format.class":"io.confluent.connect.s3.format.json.JsonFormat",
      "errors.log.enable":"true",
      "s3.bucket.name":"bucket",
      "topics": "topic_a, topic_b, topic_c",
      "s3.compression.type":"gzip",
      "partitioner.class":"io.confluent.connect.storage.partitioner.DailyPartitioner",
      "name":"NAME",
      "storage.class":"io.confluent.connect.s3.storage.S3Storage",
      "key.converter.schemas.enable":"true",
      "key.converter":"org.apache.kafka.connect.storage.StringConverter",
      "value.converter.schemas.enable":"true",
      "value.converter":"io.confluent.connect.avro.AvroConverter",
      "value.converter.schema.registry.url":"https://schemaregistry.com",
      "enhanced.avro.schema.support": "true",
      "transforms": "RenameField",
      "transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
      "transforms.RenameField.renames": "created_date:creation_date"
   }

only topic_a have created_date and others don't,

然后您将使用单独的连接器。一个带有转换,所有主题都带有字段,另一个没有转换。

from all topics with single connector

这不能很好地扩展。您正在制作有限的消费者线程和一个消费者组来同时阅读许多主题。多个连接器会更好地分配负载。