Kafka Connect Sink - 从:Avro 主题,到:Json -> Redis
Kafka Connect Sink - From: Avro Topic, To: Json -> Redis
我有一个环境,我使用 Kafka Connect Worker 使用 Oracle 数据库中的一些数据,然后将其推送到 Avro 格式的 Kafka 主题中。
现在,我需要创建一个 Kafka Connect Sink 来消费这个 AVRO 消息,将其转换为 Json 然后写入 Redis 数据库。
到目前为止,我只能在 Redis 上写入我从主题中使用的相同 AVRO 消息。我尝试使用转换器,但我可能误解了它们的用法。
下面是我对 worker 和 sink 的配置。
{
"name": "SOURCE",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"transforms": "createKey, extractStr",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields": "ID",
"transforms.extractStr.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractStr.field": "ID",
"connection.url": "<>",
"connection.user": "<>",
"connection.password": "<>",
"table.whitelist": "V_TEST_C",
"schema.pattern": "<>",
"numeric.mapping": "best_fit",
"mode": "timestamp+incrementing",
"incrementing.column.name": "CID",
"timestamp.column.name": "TS_ULT_ALT",
"validate.non.null": "false",
"table.types": "VIEW",
"retention.ms":12000,
"poll.interval.ms": "30000",
"topic.prefix": "TEST.",
"value.converter.schema.registry.url": "<>"
}
}
下沉
{
"name": "SINK",
"config": {
"connector.class": "com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"tasks.max": "1",
"topics": "V_TEST_C",
"redis.hosts": "redis:6379",
"schema.registry.url": "<>",
"value.converter.schema.registry.url": "<>",
"value.converter.schemas.enable":"false",
"key.converter.schemas.enable":"false",
"insert.mode": "UPSERT",
"delete.enabled": "false",
"quote.sql.identifier": "never"
}
}
pushes it in Kafka Topics on Avro format
那不是你发布的内容
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
在任何情况下,您都不能使用该 Redis 连接器存储除字符串或字节以外的任何内容
From docs(强调)
This connector expects records from Kafka to have a key and value that are stored as bytes or a string. If your data is already in Kafka in the format that you want in Redis consider using the ByteArrayConverter or the StringConverter for this connector
因此,无法使用 AvroConverter,因为它会生成结构化对象,而不是字符串或字节的架构。
注意:JSON禁用模式的转换器实际上与处理 JSON 数据的字符串转换器具有相同的行为。 但是,如果将 StringConverter 与 Avro 数据一起使用,您最终可能会得到类似 Struct{foo=bar}
的数据,但您仍然需要 运行 在某种程度上使用 Avro 解串器,所以我不确定您是否真的可以将 Json 或 StringConverter 与 Avro 主题数据一起使用。
我有一个环境,我使用 Kafka Connect Worker 使用 Oracle 数据库中的一些数据,然后将其推送到 Avro 格式的 Kafka 主题中。
现在,我需要创建一个 Kafka Connect Sink 来消费这个 AVRO 消息,将其转换为 Json 然后写入 Redis 数据库。
到目前为止,我只能在 Redis 上写入我从主题中使用的相同 AVRO 消息。我尝试使用转换器,但我可能误解了它们的用法。
下面是我对 worker 和 sink 的配置。
{
"name": "SOURCE",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"transforms": "createKey, extractStr",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields": "ID",
"transforms.extractStr.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractStr.field": "ID",
"connection.url": "<>",
"connection.user": "<>",
"connection.password": "<>",
"table.whitelist": "V_TEST_C",
"schema.pattern": "<>",
"numeric.mapping": "best_fit",
"mode": "timestamp+incrementing",
"incrementing.column.name": "CID",
"timestamp.column.name": "TS_ULT_ALT",
"validate.non.null": "false",
"table.types": "VIEW",
"retention.ms":12000,
"poll.interval.ms": "30000",
"topic.prefix": "TEST.",
"value.converter.schema.registry.url": "<>"
}
}
下沉
{
"name": "SINK",
"config": {
"connector.class": "com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"tasks.max": "1",
"topics": "V_TEST_C",
"redis.hosts": "redis:6379",
"schema.registry.url": "<>",
"value.converter.schema.registry.url": "<>",
"value.converter.schemas.enable":"false",
"key.converter.schemas.enable":"false",
"insert.mode": "UPSERT",
"delete.enabled": "false",
"quote.sql.identifier": "never"
}
}
pushes it in Kafka Topics on Avro format
那不是你发布的内容
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
在任何情况下,您都不能使用该 Redis 连接器存储除字符串或字节以外的任何内容
From docs(强调)
This connector expects records from Kafka to have a key and value that are stored as bytes or a string. If your data is already in Kafka in the format that you want in Redis consider using the ByteArrayConverter or the StringConverter for this connector
因此,无法使用 AvroConverter,因为它会生成结构化对象,而不是字符串或字节的架构。
注意:JSON禁用模式的转换器实际上与处理 JSON 数据的字符串转换器具有相同的行为。 但是,如果将 StringConverter 与 Avro 数据一起使用,您最终可能会得到类似 Struct{foo=bar}
的数据,但您仍然需要 运行 在某种程度上使用 Avro 解串器,所以我不确定您是否真的可以将 Json 或 StringConverter 与 Avro 主题数据一起使用。