kafka connect sql 来自 cdc 的服务器增量更改

kafka connect sql server incremental changes from cdc

我是 Kafka 的新手(从一周开始在我的沙盒环境中阅读和设置)并尝试设置 SQL 服务器 JDBC 连接器。 我已经按照 confluent 指南设置了 Confluent 社区,并使用 confluent-hub 安装了 io.debezium.connector.sqlserver.SqlServerConnector 我在 SQL 服务器数据库上启用了 CDC 并需要 table,它工作正常。

我尝试了以下连接器(一次一个):

  1. io.debezium.connector.sqlserver.SqlServerConnector
  2. io.confluent.connect.jdbc.JdbcSourceConnector

连接器和任务的状态均正常加载运行正常,没有错误,如下所示:

这是我的 io.confluent.connect.jdbc.JdbcSourceConnector 配置

{
"name": "mssql-connector",
"config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "mode": "timestamp",
        "timestamp.column.name": "CreatedDateTime",
        "query": "select * from dbo.sampletable",
        "tasks.max": "1",
        "table.types": "TABLE",
        "key.converter.schemas.enable": "false",
        "topic.prefix": "data_",
        "value.converter.schemas.enable": "false",
        "connection.url": "jdbc:sqlserver://SQL2016:1433;databaseName=sampledb",
        "connection.user": "kafka",
        "connection.password": "kafkaPassword@789",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "poll.interval.ms": "5000",
        "table.poll.interval.ms": "120000"
    }
}

这是我的 io.confluent.connect.jdbc.JdbcSourceConnector 连接器

{
 "name": "mssql-connector",
 "config": {
     "connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",
     "tasks.max" : "1",
     "database.server.name" : "SQL2016",
     "database.hostname" : "SQL2016",
     "database.port" : "1433",
     "database.user" : "kafka",
     "database.password" : "kafkaPassword@789",
     "database.dbname" : "sampleDb",
     "database.history.kafka.bootstrap.servers" : "kafkanode1:9092",
     "database.history.kafka.topic": "schema-changes.sampleDb"
     }
 }

两个连接器都在主题中创建 table 的快照(意味着它最初拉取所有行) 但是当我对 table "sampletable" (insert/update/delete) 进行更改时,这些更改不会被拉到 kafka。

有人可以帮我了解如何让 CDC 与 Kafka 一起工作吗?

谢谢

这似乎 100% 有效。我发布答案是为了防止像我这样的人卡在 jdbc 源连接器上。

{
"name": "piilog-connector",
"config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "mode": "incrementing",       
        "value.converter.schemas.enable": "false",
        "connection.url": "jdbc:sqlserver://SQL2016:1433;databaseName=SIAudit",
        "connection.user": "kafka",
        "connection.password": "kafkaPassword@789",  
        "query": "select * from dbo.sampletable",
        "incrementing.column.name": "Id",
        "validate.non.null": false,
        "topic.prefix": "data_",
        "tasks.max": "1",
        "poll.interval.ms": "5000",
        "table.poll.interval.ms": "5000"
    }
}