Kafka Connect Sink - 从:Avro 主题,到:Json -> Redis

Kafka Connect Sink - From: Avro Topic, To: Json -> Redis

我有一个环境,我使用 Kafka Connect Worker 使用 Oracle 数据库中的一些数据,然后将其推送到 Avro 格式的 Kafka 主题中。

现在,我需要创建一个 Kafka Connect Sink 来消费这个 AVRO 消息,将其转换为 Json 然后写入 Redis 数据库。

到目前为止,我只能在 Redis 上写入我从主题中使用的相同 AVRO 消息。我尝试使用转换器,但我可能误解了它们的用法。

下面是我对 worker 和 sink 的配置。

{
    "name": "SOURCE",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "transforms": "createKey, extractStr",
        "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
        "transforms.createKey.fields": "ID",
        "transforms.extractStr.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
        "transforms.extractStr.field": "ID",
        "connection.url": "<>",
        "connection.user": "<>",
        "connection.password": "<>",
        "table.whitelist": "V_TEST_C",
        "schema.pattern": "<>",
        "numeric.mapping": "best_fit",
        "mode": "timestamp+incrementing",
        "incrementing.column.name": "CID",
        "timestamp.column.name": "TS_ULT_ALT",
        "validate.non.null": "false",
        "table.types": "VIEW",
        "retention.ms":12000,
        "poll.interval.ms": "30000",
        "topic.prefix": "TEST.",
        "value.converter.schema.registry.url": "<>"
    }
}

下沉

{
  "name": "SINK",
  "config": {
    "connector.class": "com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "tasks.max": "1",
    "topics": "V_TEST_C",
    "redis.hosts": "redis:6379",
    "schema.registry.url": "<>",
    "value.converter.schema.registry.url": "<>",
    "value.converter.schemas.enable":"false",
    "key.converter.schemas.enable":"false",
    "insert.mode": "UPSERT",
    "delete.enabled": "false",
    "quote.sql.identifier": "never"
  }
}

pushes it in Kafka Topics on Avro format

那不是你发布的内容

"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",

在任何情况下,您都不能使用该 Redis 连接器存储除字符串或字节以外的任何内容

From docs(强调)

This connector expects records from Kafka to have a key and value that are stored as bytes or a string. If your data is already in Kafka in the format that you want in Redis consider using the ByteArrayConverter or the StringConverter for this connector

因此,无法使用 AvroConverter,因为它会生成结构化对象,而不是字符串或字节的架构。

注意:JSON禁用模式的转换器实际上与处理 JSON 数据的字符串转换器具有相同的行为。 但是,如果将 StringConverter 与 Avro 数据一起使用,您最终可能会得到类似 Struct{foo=bar} 的数据,但您仍然需要 运行 在某种程度上使用 Avro 解串器,所以我不确定您是否真的可以将 Json 或 StringConverter 与 Avro 主题数据一起使用。