`delete.enabled=true` 不通过 JDBC 接收器连接器删除 MySQL 中的记录

`delete.enabled=true` not deleting the record in MySQL through JDBC sink connector

我的接收器配置文件包含以下配置 -

...
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "DELETESRC",
"insert.mode": "upsert",
"batch.size": "50000",
"table.name.format": "DELETESRC",
"pk.mode": "record_key",
"pk.fields": "ID,C_NO",
"delete.enabled": "true",
"auto.create": "true",
"auto.evolve": "true",
"max.retries": "10",
"retry.backoff.ms": "3000",
"mode": "bulk",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schemas.enable": "true",
"value.converter.schema.registry.url": "http://localhost:8081",
"transforms": "ValueToKey",
"transforms.ValueToKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.ValueToKey.fields": "ID,C_NO"
...

我可以通过按键使用更新插入,但无法在 JDBC 接收器中使用删除模式。 我将主题 DELETESRC 配置为 cleanup.policy=compactdelete.retention.ms=0。 我创建了一个包含 4 列的 KSQL 流 (ID,CMP,SEG,C_NO),并使用 KSQL 中的插入语句来推送数据。

INSERT INTO DELETESRC VALUES ('null','11','D','1','3')
INSERT INTO DELETESRC VALUES ('null','11','C','1','4')
INSERT INTO DELETESRC VALUES ('null','12','F','1','3')

但是当我执行 INSERT INTO DELETESRC VALUES ('null','11','null','null','3') 时,接收器正在将 table 更新为 11,null,null,3。 我查看了堆栈溢出中的其他答案,但这些解决方案没有用。

我在创建墓碑记录时有没有做错?

我在 KSQL 的插入语句中尝试了其他方法,但删除操作没有发生。

为了生成正确的逻辑删除消息,您需要提供具有 null 值的键控消息。在示例中,您没有显示空值。


此外,我猜你需要增加 delete.retention.ms:

The amount of time to retain delete tombstone markers for log compacted topics. This setting also gives a bound on the time in which a consumer must complete a read if they begin from offset 0 to ensure that they get a valid snapshot of the final stage (otherwise delete tombstones may be collected before they complete their scan).