Kafka Connect Elasticsearch sink 没有文档被索引
Kafka Connect Elasticsearch sink no documents are indexed
我正在尝试设置一个测试以将数据从 MySQL 移动到 Elasticsearch。
我有一个 docker 化的设置,包括 broker、zookeeper、connect、ksql server 和 cli、schema registry 和 Elasticsearch。我正在使用融合版本 5.1.0 中的 docker 图像,对于 Elasticsearch,我正在使用 elasticsearch:6.5.4
我配置了一个 JDBC 连接器 以从 MySQL 获取数据到 Kafka,这是有效的 我看到我的主题创建并使用 ksql-cli 我当我更新 MySQL.
中的行时,可以看到流中的新消息
我还配置了一个 Elasticsearch sink connector connector 创建成功,Elasticsearch 中的索引也在那里,但是我在我的 Elasticsearch 索引中看到 no documents .
这是 ES 接收器连接器配置:
{
"name": "es-connector",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"connection.url": "http://es:9200",
"type.name": "_doc",
"topics": "test_topic",
"drop.invalid.message": true,
"behavior.on.null.values": "ignore",
"behavior.on.malformed.documents": "ignore",
"schema.ignore": true
}
}
这是我查询接收器连接器状态时看到的内容:curl -X GET http://connect:8083/connectors/es-connector
{
"name": "es-connector",
"connector": {
"state": "RUNNING",
"worker_id": "connect:8083"
},
"tasks": [
{
"state": "RUNNING",
"id": 0,
"worker_id": "connect:8083"
}
],
"type": "sink"
}
在 Elasticsearch 中我可以看到索引 http://es:9200/test_topic/_search
{
"took": 1,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 0,
"max_score": null,
"hits": []
}
}
我一直在 MySQL 中进行更新和插入,我使用 ksql-cli 在流中看到了消息,但 Elasticsearch 中没有创建任何文档。我什至使用 kafka-avro-console-producer
手动创建了一个主题并发布了消息,然后为该主题创建了第二个接收器连接器,结果相同,我看到了索引但没有文档。
我在 kafka-connect 中没有看到任何错误,所以我不明白为什么不起作用。连接器配置有问题吗?我错过了什么吗?
编辑:
对于 Elasticsearch 接收器配置,我尝试使用和不使用这些行:
"drop.invalid.message": true,
"behavior.on.null.values": "ignore",
"behavior.on.malformed.documents": "ignore",
"schema.ignore": true
和结果一样,没有文件。
编辑
我发现错误:
Key is used as document id and cannot be null
。
与
"key.ignore": true
Elasticsearch 接收器将使用主题+分区+偏移量作为 Elasticsearch 文档 ID。如您所见,您会为 每 条消息获得一个新文档。
与
"key.ignore": false
Elasticsearch 接收器将使用 Kafka 消息的 Key 作为 Elasticsearch 文档 ID。如果您的 Kafka 消息中没有密钥,您将收到错误 Key is used as document id and cannot be null
,这是可以理解的。您可以使用各种方法在 Kafka 消息中设置密钥,如果您通过 Kafka Connect 接收,包括使用单消息转换来设置 Kafka 消息密钥,detailed here。
我正在尝试设置一个测试以将数据从 MySQL 移动到 Elasticsearch。
我有一个 docker 化的设置,包括 broker、zookeeper、connect、ksql server 和 cli、schema registry 和 Elasticsearch。我正在使用融合版本 5.1.0 中的 docker 图像,对于 Elasticsearch,我正在使用 elasticsearch:6.5.4
我配置了一个 JDBC 连接器 以从 MySQL 获取数据到 Kafka,这是有效的 我看到我的主题创建并使用 ksql-cli 我当我更新 MySQL.
中的行时,可以看到流中的新消息我还配置了一个 Elasticsearch sink connector connector 创建成功,Elasticsearch 中的索引也在那里,但是我在我的 Elasticsearch 索引中看到 no documents .
这是 ES 接收器连接器配置:
{
"name": "es-connector",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"connection.url": "http://es:9200",
"type.name": "_doc",
"topics": "test_topic",
"drop.invalid.message": true,
"behavior.on.null.values": "ignore",
"behavior.on.malformed.documents": "ignore",
"schema.ignore": true
}
}
这是我查询接收器连接器状态时看到的内容:curl -X GET http://connect:8083/connectors/es-connector
{
"name": "es-connector",
"connector": {
"state": "RUNNING",
"worker_id": "connect:8083"
},
"tasks": [
{
"state": "RUNNING",
"id": 0,
"worker_id": "connect:8083"
}
],
"type": "sink"
}
在 Elasticsearch 中我可以看到索引 http://es:9200/test_topic/_search
{
"took": 1,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 0,
"max_score": null,
"hits": []
}
}
我一直在 MySQL 中进行更新和插入,我使用 ksql-cli 在流中看到了消息,但 Elasticsearch 中没有创建任何文档。我什至使用 kafka-avro-console-producer
手动创建了一个主题并发布了消息,然后为该主题创建了第二个接收器连接器,结果相同,我看到了索引但没有文档。
我在 kafka-connect 中没有看到任何错误,所以我不明白为什么不起作用。连接器配置有问题吗?我错过了什么吗?
编辑:
对于 Elasticsearch 接收器配置,我尝试使用和不使用这些行:
"drop.invalid.message": true,
"behavior.on.null.values": "ignore",
"behavior.on.malformed.documents": "ignore",
"schema.ignore": true
和结果一样,没有文件。
编辑
我发现错误:
Key is used as document id and cannot be null
。
与
"key.ignore": true
Elasticsearch 接收器将使用主题+分区+偏移量作为 Elasticsearch 文档 ID。如您所见,您会为 每 条消息获得一个新文档。
与
"key.ignore": false
Elasticsearch 接收器将使用 Kafka 消息的 Key 作为 Elasticsearch 文档 ID。如果您的 Kafka 消息中没有密钥,您将收到错误 Key is used as document id and cannot be null
,这是可以理解的。您可以使用各种方法在 Kafka 消息中设置密钥,如果您通过 Kafka Connect 接收,包括使用单消息转换来设置 Kafka 消息密钥,detailed here。