`delete.enabled=true` 不通过 JDBC 接收器连接器删除 MySQL 中的记录
`delete.enabled=true` not deleting the record in MySQL through JDBC sink connector
我的接收器配置文件包含以下配置 -
...
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "DELETESRC",
"insert.mode": "upsert",
"batch.size": "50000",
"table.name.format": "DELETESRC",
"pk.mode": "record_key",
"pk.fields": "ID,C_NO",
"delete.enabled": "true",
"auto.create": "true",
"auto.evolve": "true",
"max.retries": "10",
"retry.backoff.ms": "3000",
"mode": "bulk",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schemas.enable": "true",
"value.converter.schema.registry.url": "http://localhost:8081",
"transforms": "ValueToKey",
"transforms.ValueToKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.ValueToKey.fields": "ID,C_NO"
...
我可以通过按键使用更新插入,但无法在 JDBC 接收器中使用删除模式。
我将主题 DELETESRC
配置为 cleanup.policy=compact
、delete.retention.ms=0
。
我创建了一个包含 4 列的 KSQL 流 (ID,CMP,SEG,C_NO
),并使用 KSQL 中的插入语句来推送数据。
INSERT INTO DELETESRC VALUES ('null','11','D','1','3')
INSERT INTO DELETESRC VALUES ('null','11','C','1','4')
INSERT INTO DELETESRC VALUES ('null','12','F','1','3')
但是当我执行 INSERT INTO DELETESRC VALUES ('null','11','null','null','3')
时,接收器正在将 table 更新为 11,null,null,3
。
我查看了堆栈溢出中的其他答案,但这些解决方案没有用。
我在创建墓碑记录时有没有做错?
我在 KSQL 的插入语句中尝试了其他方法,但删除操作没有发生。
为了生成正确的逻辑删除消息,您需要提供具有 null
值的键控消息。在示例中,您没有显示空值。
此外,我猜你需要增加 delete.retention.ms
:
The amount of time to retain delete tombstone markers for log
compacted topics. This setting also gives a bound on the time in which
a consumer must complete a read if they begin from offset 0 to ensure
that they get a valid snapshot of the final stage (otherwise delete
tombstones may be collected before they complete their scan).
我的接收器配置文件包含以下配置 -
...
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "DELETESRC",
"insert.mode": "upsert",
"batch.size": "50000",
"table.name.format": "DELETESRC",
"pk.mode": "record_key",
"pk.fields": "ID,C_NO",
"delete.enabled": "true",
"auto.create": "true",
"auto.evolve": "true",
"max.retries": "10",
"retry.backoff.ms": "3000",
"mode": "bulk",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schemas.enable": "true",
"value.converter.schema.registry.url": "http://localhost:8081",
"transforms": "ValueToKey",
"transforms.ValueToKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.ValueToKey.fields": "ID,C_NO"
...
我可以通过按键使用更新插入,但无法在 JDBC 接收器中使用删除模式。
我将主题 DELETESRC
配置为 cleanup.policy=compact
、delete.retention.ms=0
。
我创建了一个包含 4 列的 KSQL 流 (ID,CMP,SEG,C_NO
),并使用 KSQL 中的插入语句来推送数据。
INSERT INTO DELETESRC VALUES ('null','11','D','1','3')
INSERT INTO DELETESRC VALUES ('null','11','C','1','4')
INSERT INTO DELETESRC VALUES ('null','12','F','1','3')
但是当我执行 INSERT INTO DELETESRC VALUES ('null','11','null','null','3')
时,接收器正在将 table 更新为 11,null,null,3
。
我查看了堆栈溢出中的其他答案,但这些解决方案没有用。
我在创建墓碑记录时有没有做错?
我在 KSQL 的插入语句中尝试了其他方法,但删除操作没有发生。
为了生成正确的逻辑删除消息,您需要提供具有 null
值的键控消息。在示例中,您没有显示空值。
此外,我猜你需要增加 delete.retention.ms
:
The amount of time to retain delete tombstone markers for log compacted topics. This setting also gives a bound on the time in which a consumer must complete a read if they begin from offset 0 to ensure that they get a valid snapshot of the final stage (otherwise delete tombstones may be collected before they complete their scan).