Kafka JDBC 源连接器:从列值创建主题
Kafka JDBC Source connector: create topics from column values
我有一个使用 OracleDB 在 EVENT_STORE
table 中发布系统更改的微服务。 table EVENT_STORE
包含带有事件类型名称的列 TYPE
。
JDBC Source Kafka Connect 可能会采用 EVENT_STORE
table 更改并使用 KAFKA-TOPIC 中列 TYPE
的值发布它们?
这是我的源卡夫卡连接器配置:
{
"name": "kafka-connector-source-ms-name",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"connection.url": "jdbc:oracle:thin:@localhost:1521:xe",
"connection.user": "squeme-name",
"connection.password": "password",
"topic.prefix": "",
"table.whitelist": "EVENT_STORE",
"mode": "timestamp+incrementing",
"timestamp.column.name": "CREATE_AT",
"incrementing.column.name": "ID",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"config.action.reload": "restart",
"errors.retry.timeout": "0",
"errors.retry.delay.max.ms": "60000",
"errors.tolerance": "none",
"errors.log.enable": "false",
"errors.log.include.messages": "false",
"connection.attempts": "3",
"connection.backoff.ms": "10000",
"numeric.precision.mapping": "false",
"validate.non.null": "true",
"quote.sql.identifiers": "ALWAYS",
"table.types": "TABLE",
"poll.interval.ms": "5000",
"batch.max.rows": "100",
"table.poll.interval.ms": "60000",
"timestamp.delay.interval.ms": "0",
"db.timezone": "UTC"
}
}
您可以尝试 ExtractTopic 转换从字段中提取主题名称
将以下属性添加到 JSON
transforms=ValueFieldExample
transforms.ValueFieldExample.type=io.confluent.connect.transforms.ExtractTopic$Value
transforms.ValueFieldExample.field=TYPE
我有一个使用 OracleDB 在 EVENT_STORE
table 中发布系统更改的微服务。 table EVENT_STORE
包含带有事件类型名称的列 TYPE
。
JDBC Source Kafka Connect 可能会采用 EVENT_STORE
table 更改并使用 KAFKA-TOPIC 中列 TYPE
的值发布它们?
这是我的源卡夫卡连接器配置:
{
"name": "kafka-connector-source-ms-name",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"connection.url": "jdbc:oracle:thin:@localhost:1521:xe",
"connection.user": "squeme-name",
"connection.password": "password",
"topic.prefix": "",
"table.whitelist": "EVENT_STORE",
"mode": "timestamp+incrementing",
"timestamp.column.name": "CREATE_AT",
"incrementing.column.name": "ID",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"config.action.reload": "restart",
"errors.retry.timeout": "0",
"errors.retry.delay.max.ms": "60000",
"errors.tolerance": "none",
"errors.log.enable": "false",
"errors.log.include.messages": "false",
"connection.attempts": "3",
"connection.backoff.ms": "10000",
"numeric.precision.mapping": "false",
"validate.non.null": "true",
"quote.sql.identifiers": "ALWAYS",
"table.types": "TABLE",
"poll.interval.ms": "5000",
"batch.max.rows": "100",
"table.poll.interval.ms": "60000",
"timestamp.delay.interval.ms": "0",
"db.timezone": "UTC"
}
}
您可以尝试 ExtractTopic 转换从字段中提取主题名称
将以下属性添加到 JSON
transforms=ValueFieldExample
transforms.ValueFieldExample.type=io.confluent.connect.transforms.ExtractTopic$Value
transforms.ValueFieldExample.field=TYPE