Kafka Connect Elasticsearch sink 没有文档被索引

Kafka Connect Elasticsearch sink no documents are indexed

我正在尝试设置一个测试以将数据从 MySQL 移动到 Elasticsearch。

我有一个 docker 化的设置,包括 broker、zookeeper、connect、ksql server 和 cli、schema registry 和 Elasticsearch。我正在使用融合版本 5.1.0 中的 docker 图像,对于 Elasticsearch,我正在使用 elasticsearch:6.5.4

我配置了一个 JDBC 连接器 以从 MySQL 获取数据到 Kafka,这是有效的 我看到我的主题创建并使用 ksql-cli 我当我更新 MySQL.

中的行时,可以看到流中的新消息

我还配置了一个 Elasticsearch sink connector connector 创建成功,Elasticsearch 中的索引也在那里,但是我在我的 Elasticsearch 索引中看到 no documents .

这是 ES 接收器连接器配置:

{
    "name": "es-connector",
    "config": {
            "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
            "value.converter": "io.confluent.connect.avro.AvroConverter",
            "key.converter": "io.confluent.connect.avro.AvroConverter",
            "key.converter.schema.registry.url": "http://schema-registry:8081",
            "value.converter.schema.registry.url": "http://schema-registry:8081",
            "connection.url": "http://es:9200",
            "type.name": "_doc",
            "topics": "test_topic",
            "drop.invalid.message": true,
            "behavior.on.null.values": "ignore",
            "behavior.on.malformed.documents": "ignore",
            "schema.ignore": true
    }
}

这是我查询接收器连接器状态时看到的内容:curl -X GET http://connect:8083/connectors/es-connector

{
    "name": "es-connector",
    "connector": {
        "state": "RUNNING",
        "worker_id": "connect:8083"
    },
    "tasks": [
        {
            "state": "RUNNING",
            "id": 0,
            "worker_id": "connect:8083"
        }
    ],
    "type": "sink"
}

在 Elasticsearch 中我可以看到索引 http://es:9200/test_topic/_search

{
  "took": 1,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 0,
    "max_score": null,
    "hits": []
  }
}

我一直在 MySQL 中进行更新和插入,我使用 ksql-cli 在流中看到了消息,但 Elasticsearch 中没有创建任何文档。我什至使用 kafka-avro-console-producer 手动创建了一个主题并发布了消息,然后为该主题创建了第二个接收器连接器,结果相同,我看到了索引但没有文档。

我在 kafka-connect 中没有看到任何错误,所以我不明白为什么不起作用。连接器配置有问题吗?我错过了什么吗?

编辑:

对于 Elasticsearch 接收器配置,我尝试使用和不使用这些行:

"drop.invalid.message": true,
"behavior.on.null.values": "ignore",
"behavior.on.malformed.documents": "ignore",
"schema.ignore": true

和结果一样,没有文件。

编辑

我发现错误:

Key is used as document id and cannot be null

"key.ignore": true

Elasticsearch 接收器将使用主题+分区+偏移量作为 Elasticsearch 文档 ID。如您所见,您会为 条消息获得一个新文档。

"key.ignore": false

Elasticsearch 接收器将使用 Kafka 消息的 Key 作为 Elasticsearch 文档 ID。如果您的 Kafka 消息中没有密钥,您将收到错误 Key is used as document id and cannot be null,这是可以理解的。您可以使用各种方法在 Kafka 消息中设置密钥,如果您通过 Kafka Connect 接收,包括使用单消息转换来设置 Kafka 消息密钥,detailed here