kafka connect sql 来自 cdc 的服务器增量更改
kafka connect sql server incremental changes from cdc
我是 Kafka 的新手(从一周开始在我的沙盒环境中阅读和设置)并尝试设置 SQL 服务器 JDBC 连接器。
我已经按照 confluent 指南设置了 Confluent 社区,并使用 confluent-hub 安装了 io.debezium.connector.sqlserver.SqlServerConnector
我在 SQL 服务器数据库上启用了 CDC 并需要 table,它工作正常。
我尝试了以下连接器(一次一个):
- io.debezium.connector.sqlserver.SqlServerConnector
- io.confluent.connect.jdbc.JdbcSourceConnector
连接器和任务的状态均正常加载运行正常,没有错误,如下所示:
这是我的 io.confluent.connect.jdbc.JdbcSourceConnector 配置
{
"name": "mssql-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"mode": "timestamp",
"timestamp.column.name": "CreatedDateTime",
"query": "select * from dbo.sampletable",
"tasks.max": "1",
"table.types": "TABLE",
"key.converter.schemas.enable": "false",
"topic.prefix": "data_",
"value.converter.schemas.enable": "false",
"connection.url": "jdbc:sqlserver://SQL2016:1433;databaseName=sampledb",
"connection.user": "kafka",
"connection.password": "kafkaPassword@789",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"poll.interval.ms": "5000",
"table.poll.interval.ms": "120000"
}
}
这是我的 io.confluent.connect.jdbc.JdbcSourceConnector 连接器
{
"name": "mssql-connector",
"config": {
"connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max" : "1",
"database.server.name" : "SQL2016",
"database.hostname" : "SQL2016",
"database.port" : "1433",
"database.user" : "kafka",
"database.password" : "kafkaPassword@789",
"database.dbname" : "sampleDb",
"database.history.kafka.bootstrap.servers" : "kafkanode1:9092",
"database.history.kafka.topic": "schema-changes.sampleDb"
}
}
两个连接器都在主题中创建 table 的快照(意味着它最初拉取所有行)
但是当我对 table "sampletable" (insert/update/delete) 进行更改时,这些更改不会被拉到 kafka。
有人可以帮我了解如何让 CDC 与 Kafka 一起工作吗?
谢谢
这似乎 100% 有效。我发布答案是为了防止像我这样的人卡在 jdbc 源连接器上。
{
"name": "piilog-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"mode": "incrementing",
"value.converter.schemas.enable": "false",
"connection.url": "jdbc:sqlserver://SQL2016:1433;databaseName=SIAudit",
"connection.user": "kafka",
"connection.password": "kafkaPassword@789",
"query": "select * from dbo.sampletable",
"incrementing.column.name": "Id",
"validate.non.null": false,
"topic.prefix": "data_",
"tasks.max": "1",
"poll.interval.ms": "5000",
"table.poll.interval.ms": "5000"
}
}
我是 Kafka 的新手(从一周开始在我的沙盒环境中阅读和设置)并尝试设置 SQL 服务器 JDBC 连接器。 我已经按照 confluent 指南设置了 Confluent 社区,并使用 confluent-hub 安装了 io.debezium.connector.sqlserver.SqlServerConnector 我在 SQL 服务器数据库上启用了 CDC 并需要 table,它工作正常。
我尝试了以下连接器(一次一个):
- io.debezium.connector.sqlserver.SqlServerConnector
- io.confluent.connect.jdbc.JdbcSourceConnector
连接器和任务的状态均正常加载运行正常,没有错误,如下所示:
这是我的 io.confluent.connect.jdbc.JdbcSourceConnector 配置
{
"name": "mssql-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"mode": "timestamp",
"timestamp.column.name": "CreatedDateTime",
"query": "select * from dbo.sampletable",
"tasks.max": "1",
"table.types": "TABLE",
"key.converter.schemas.enable": "false",
"topic.prefix": "data_",
"value.converter.schemas.enable": "false",
"connection.url": "jdbc:sqlserver://SQL2016:1433;databaseName=sampledb",
"connection.user": "kafka",
"connection.password": "kafkaPassword@789",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"poll.interval.ms": "5000",
"table.poll.interval.ms": "120000"
}
}
这是我的 io.confluent.connect.jdbc.JdbcSourceConnector 连接器
{
"name": "mssql-connector",
"config": {
"connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max" : "1",
"database.server.name" : "SQL2016",
"database.hostname" : "SQL2016",
"database.port" : "1433",
"database.user" : "kafka",
"database.password" : "kafkaPassword@789",
"database.dbname" : "sampleDb",
"database.history.kafka.bootstrap.servers" : "kafkanode1:9092",
"database.history.kafka.topic": "schema-changes.sampleDb"
}
}
两个连接器都在主题中创建 table 的快照(意味着它最初拉取所有行) 但是当我对 table "sampletable" (insert/update/delete) 进行更改时,这些更改不会被拉到 kafka。
有人可以帮我了解如何让 CDC 与 Kafka 一起工作吗?
谢谢
这似乎 100% 有效。我发布答案是为了防止像我这样的人卡在 jdbc 源连接器上。
{
"name": "piilog-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"mode": "incrementing",
"value.converter.schemas.enable": "false",
"connection.url": "jdbc:sqlserver://SQL2016:1433;databaseName=SIAudit",
"connection.user": "kafka",
"connection.password": "kafkaPassword@789",
"query": "select * from dbo.sampletable",
"incrementing.column.name": "Id",
"validate.non.null": false,
"topic.prefix": "data_",
"tasks.max": "1",
"poll.interval.ms": "5000",
"table.poll.interval.ms": "5000"
}
}