illegal_argument_exception 在使用 kafka 连接器推送到 elasticsearch 时
illegal_argument_exception while using kafka connector to push to elasticsearch
我正在尝试将日志从 kafka tops 推送到 elasticsearch。
我在kafka中的消息:
{
"@timestamp": 1589549688.659166,
"log": "13:34:48.658 [pool-2-thread-1] DEBUG health check success",
"stream": "stdout",
"time": "2020-05-15T13:34:48.659166158Z",
"pod_name": "my-pod-789f8c85f4-mt62l",
"namespace_name": "services",
"pod_id": "600ca012-91f5-XXXX-XXXX-XXXXXXXXXXX",
"host": "ip-192-168-88-59.ap-south-1.compute.internal",
"container_name": "my-pod",
"docker_id": "XXXXXXXXXXXXXXXXX1435bb2870bfc9d20deb2c483ce07f8e71ec",
"container_hash": "myregistry",
"labelpod-template-hash": "9tignfe9r",
"labelsecurity.istio.io/tlsMode": "istio",
"labelservice": "my-pod",
"labelservice.istio.io/canonical-name": "my-pod",
"labelservice.istio.io/canonical-revision": "latest",
"labeltype": "my-pod",
"annotationkubernetes.io/psp": "eks.privileged",
"annotationsidecar.istio.io/status": "{\"version\":\"58dc8b12bb311f1e2f46fd56abfe876ac96a38d7ac3fc6581af3598ccca7522f\"}"
}
这是我的连接器配置:
{
"name": "logs",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url": "http://es:9200",
"connection.username": "username",
"connection.password": "password",
"tasks.max": "10",
"topics": "my-pod",
"name": "logs",
"type.name": "_doc",
"schema.ignore": "true",
"key.ignore": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"transforms": "routeTS",
"transforms.routeTS.type": "org.apache.kafka.connect.transforms.TimestampRouter",
"transforms.routeTS.topic.format": "${topic}-${timestamp}",
"transforms.routeTS.timestamp.format": "YYYYMMDD"
}
}
这是我遇到的错误
cp-kafka-connect-server [2020-05-15 13:30:59,083] WARN Failed to execute batch 4830 of 18 records with attempt 4/6, will attempt retry after 539 ms. Failure reason: Bulk request failed: [{"type":"illegal_argument_exception","reason":"mapper [labelservice] of different type, current_type [text], merged_type [ObjectMapper]"}
我没有事先创建任何映射。我依赖于连接器来创建索引。
这是我在 es 中自动创建的映射。
{
"mapping": {}
}
错误信息很清楚
reason":"mapper [labelservice] of different type, current_type [text],
merged_type [ObjectMapper]"
这意味着在您的索引映射中 labelservice
被定义为 text
但您在 labelservice
字段中发送以下数据:
"labelservice": "my-pod",
"labelservice.istio.io/canonical-name": "my-pod",
"labelservice.istio.io/canonical-revision": "latest",
这是 Elasticsearch 中对象类型的格式,现在数据类型不匹配导致了错误消息。
您需要更改映射并将 labelservice
定义为对象才能使其正常工作。有关详细信息,请参阅 object datatype in Elasticsearch。
我正在尝试将日志从 kafka tops 推送到 elasticsearch。
我在kafka中的消息:
{
"@timestamp": 1589549688.659166,
"log": "13:34:48.658 [pool-2-thread-1] DEBUG health check success",
"stream": "stdout",
"time": "2020-05-15T13:34:48.659166158Z",
"pod_name": "my-pod-789f8c85f4-mt62l",
"namespace_name": "services",
"pod_id": "600ca012-91f5-XXXX-XXXX-XXXXXXXXXXX",
"host": "ip-192-168-88-59.ap-south-1.compute.internal",
"container_name": "my-pod",
"docker_id": "XXXXXXXXXXXXXXXXX1435bb2870bfc9d20deb2c483ce07f8e71ec",
"container_hash": "myregistry",
"labelpod-template-hash": "9tignfe9r",
"labelsecurity.istio.io/tlsMode": "istio",
"labelservice": "my-pod",
"labelservice.istio.io/canonical-name": "my-pod",
"labelservice.istio.io/canonical-revision": "latest",
"labeltype": "my-pod",
"annotationkubernetes.io/psp": "eks.privileged",
"annotationsidecar.istio.io/status": "{\"version\":\"58dc8b12bb311f1e2f46fd56abfe876ac96a38d7ac3fc6581af3598ccca7522f\"}"
}
这是我的连接器配置:
{
"name": "logs",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url": "http://es:9200",
"connection.username": "username",
"connection.password": "password",
"tasks.max": "10",
"topics": "my-pod",
"name": "logs",
"type.name": "_doc",
"schema.ignore": "true",
"key.ignore": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"transforms": "routeTS",
"transforms.routeTS.type": "org.apache.kafka.connect.transforms.TimestampRouter",
"transforms.routeTS.topic.format": "${topic}-${timestamp}",
"transforms.routeTS.timestamp.format": "YYYYMMDD"
}
}
这是我遇到的错误
cp-kafka-connect-server [2020-05-15 13:30:59,083] WARN Failed to execute batch 4830 of 18 records with attempt 4/6, will attempt retry after 539 ms. Failure reason: Bulk request failed: [{"type":"illegal_argument_exception","reason":"mapper [labelservice] of different type, current_type [text], merged_type [ObjectMapper]"}
我没有事先创建任何映射。我依赖于连接器来创建索引。 这是我在 es 中自动创建的映射。
{
"mapping": {}
}
错误信息很清楚
reason":"mapper [labelservice] of different type, current_type [text], merged_type [ObjectMapper]"
这意味着在您的索引映射中 labelservice
被定义为 text
但您在 labelservice
字段中发送以下数据:
"labelservice": "my-pod",
"labelservice.istio.io/canonical-name": "my-pod",
"labelservice.istio.io/canonical-revision": "latest",
这是 Elasticsearch 中对象类型的格式,现在数据类型不匹配导致了错误消息。
您需要更改映射并将 labelservice
定义为对象才能使其正常工作。有关详细信息,请参阅 object datatype in Elasticsearch。