Kafka 连接数据库源连接器:如何从外键复制数据
Kafka connect database source connector : how to copy data from foreign key
我正在使用数据库源连接器将数据从我的 Postgres 数据库 table 移动到 Kafka 主题。我有一个订单 table 使用 customerNumber 字段与客户 table 有一个外键。
下面是将订单复制到 Kafka 但没有客户数据到 JSON 的连接器。我正在寻找如何将客户订单的完整对象构造成 JSON.
连接器是:
{
"name": "SOURCE_CONNECTOR",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"connection.password": "postgres_pwd",
"transforms.cast.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.cast.spec": "amount:float64",
"tasks.max": "1",
"transforms": "cast,createKey,extractInt",
"transforms.extractInt.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"batch.max.rows": "25",
"table.whitelist": "orders",
"mode": "bulk",
"topic.prefix": "data_",
"transforms.extractInt.field": "uuid",
"connection.user": "postgres_user",
"transforms.createKey.fields": "uuid",
"poll.interval.ms": "3600000",
"sql.quote.identifiers": "false",
"name": "SOURCE_CONNECTOR",
"numeric.mapping": "best_fit",
"connection.url": "url"
}
}
您可以使用 Query-based ingest. Just specify query
config 选项。
我正在使用数据库源连接器将数据从我的 Postgres 数据库 table 移动到 Kafka 主题。我有一个订单 table 使用 customerNumber 字段与客户 table 有一个外键。
下面是将订单复制到 Kafka 但没有客户数据到 JSON 的连接器。我正在寻找如何将客户订单的完整对象构造成 JSON.
连接器是:
{
"name": "SOURCE_CONNECTOR",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"connection.password": "postgres_pwd",
"transforms.cast.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.cast.spec": "amount:float64",
"tasks.max": "1",
"transforms": "cast,createKey,extractInt",
"transforms.extractInt.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"batch.max.rows": "25",
"table.whitelist": "orders",
"mode": "bulk",
"topic.prefix": "data_",
"transforms.extractInt.field": "uuid",
"connection.user": "postgres_user",
"transforms.createKey.fields": "uuid",
"poll.interval.ms": "3600000",
"sql.quote.identifiers": "false",
"name": "SOURCE_CONNECTOR",
"numeric.mapping": "best_fit",
"connection.url": "url"
}
}
您可以使用 Query-based ingest. Just specify query
config 选项。