ElasticsearchSinkConnector 对象映射不能从嵌套更改为非嵌套

ElasticsearchSinkConnector object mapping can't be changed from nested to non-nested

我正在将我的数据从 kafka 主题流式传输到 elasticsearch。但是它从连接器 {\"type\":\"illegal_argument_exception\",\"reason\":\"object mapping [search_data] can't be changed from nested to non-nested\"}

抛出这个错误

但是当我从主题中获取消息并使用 elasticsearch api 手动添加文档时,它工作正常。

kafka-connect-elasticsearch不支持嵌套对象类型吗?

请帮我回复一下,因为我被困在这里好几天了。

Elasticsearch 版本:7.6.2

卡夫卡连接图片:confluentinc/cp-kafka-connect:5.4.2

以下是我的连接器配置。

{
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "1",
    "topics": "es_sink_products",
    "key.ignore": "false",
    "schema.ignore": "true",
    "connection.url": "localhost:9200",
    "type.name": "kafka-connect",
    "name": "product-elasticsearch-sink",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter"
}

弹性搜索模式

{
  "mappings": {
    "properties": {
      "search_result_data": {
        "properties": {
          "product_id": {"type": "long"},
          "product_name": {"type": "text"},
        }
      },
      "search_data":{
        "type": "nested",
        "include_in_parent": false,
        "properties": {
          "product_id": {"type": "long"},
          "full_text": {
            "type": "text",
          },
        }
      }
    }
  }
}

来自主题 es_sink_products

的示例消息
{
    "search_data": {
        "product_id": 1,
        "full_text": "Product 1"
    },
    "search_result_data": {
        "product_id": 1,
        "product_name": "Product Name 1"
    }
}

这是来自连接器的完整错误 "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:561)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.ConnectException: Bulk request failed: [{\"type\":\"illegal_argument_exception\",\"reason\":\"object mapping [search_data] can't be changed from nested to non-nested\"}]\n\tat io.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkTask.handleMalformedDoc(BulkProcessor.java:479)\n\tat io.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkTask.execute(BulkProcessor.java:433)\n\tat io.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkTask.call(BulkProcessor.java:389)\n\tat io.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkTask.call(BulkProcessor.java:375)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\n\tat io.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkProcessorThread.run(BulkProcessor.java:370)\n"

如果之前设置了非嵌套映射并且您正尝试使用嵌套类型更新该映射,则会引发此错误。

您现在可以做的是:

  1. 删除索引
  2. 设置 nested 映射(您在上面称为 Elasticsearch Schema )一次
  3. 使用选项 "schema.ignore": "false"
  4. 启动 kafka 流

原因:由于有效负载的索引方式,将非嵌套更改为 nested 被视为 'breaking change'。

问题出在 kafka 连接器配置中的 type.name。 Elasticsearch 默认类型是 _doc。由于我使用 kafka-connect 作为类型,elasticsearch 假设我想添加另一种文档类型并与现有的 _doc 嵌套类型发生冲突。

通过更改连接器配置的 "type.name": "_doc" 解决了这个问题。