KafkaConnect elasticSearch 文档 ID 创建
KafkaConnect elasticSearch Document Id Creation
我目前正在使用以下连接器配置,但出现异常 "Key is used as document id and can not be null"
{
"name" :"hello7",
"config" : {
"name": "hello7",
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "hello7",
"connection.url":"http://127.0.0.1:8080/",
"type.name":"aggregator",
"schema.ignore": "true",
"topic.schema.ignore": "true",
"topic.key.ignore": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"key.ignore":"false",
"transforms": "extractKey",
"transforms.InsertKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.InsertKey.fields":"customerId",
"transforms.extractKey.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractKey.field":"customerId",
"errors.log.enable":true,
"errors.log.include.messages":true
}
}
我正在向主题
发送以下消息
{
"customerId" : "i7y32o4823",
"customerName" : "JOE",
"address":"123 main street",
"employee" : "ABC Company",
"employeeAddress" : "178 Main Street"
}
我收到以下错误
2020-01-17 16:28:33,624] ERROR WorkerSinkTask{id=hello7-0} Task threw
an uncaught and unrecoverable exception. Task is being killed and will
not recover until manually restarted.
(org.apache.kafka.connect.runtime.WorkerSinkTask)
org.apache.kafka.connect.errors.ConnectException: Key is used as
document id and can not be null.
at io.confluent.connect.elasticsearch.DataConverter.convertKey(DataConverter.java:79)
at io.confluent.connect.elasticsearch.DataConverter.convertRecord(DataConverter.java:160)
at io.confluent.connect.elasticsearch.ElasticsearchWriter.tryWriteRecord(ElasticsearchWriter.java:285)
at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:270)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:169)
您已设置 "key.ignore":"false"
并且仅提到了您要发送到 Kafka 主题的值。
Kafka 记录既有键也有值。如果您不指定键,它将为空。
Elasticsearch 接收器连接器不接受空键,如错误所述
ConnectException: Key is used as document id and can not be null
此外,您只提取转换中的密钥,从未使用 InsertKey
"transforms": "extractKey",
您可以使用独立的 FileSinkConnector 输出到控制台来调试连接器
name=local-console-sink
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
topics=hello7
我目前正在使用以下连接器配置,但出现异常 "Key is used as document id and can not be null"
{
"name" :"hello7",
"config" : {
"name": "hello7",
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "hello7",
"connection.url":"http://127.0.0.1:8080/",
"type.name":"aggregator",
"schema.ignore": "true",
"topic.schema.ignore": "true",
"topic.key.ignore": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"key.ignore":"false",
"transforms": "extractKey",
"transforms.InsertKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.InsertKey.fields":"customerId",
"transforms.extractKey.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractKey.field":"customerId",
"errors.log.enable":true,
"errors.log.include.messages":true
}
}
我正在向主题
发送以下消息{
"customerId" : "i7y32o4823",
"customerName" : "JOE",
"address":"123 main street",
"employee" : "ABC Company",
"employeeAddress" : "178 Main Street"
}
我收到以下错误
2020-01-17 16:28:33,624] ERROR WorkerSinkTask{id=hello7-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask) org.apache.kafka.connect.errors.ConnectException: Key is used as document id and can not be null. at io.confluent.connect.elasticsearch.DataConverter.convertKey(DataConverter.java:79) at io.confluent.connect.elasticsearch.DataConverter.convertRecord(DataConverter.java:160) at io.confluent.connect.elasticsearch.ElasticsearchWriter.tryWriteRecord(ElasticsearchWriter.java:285) at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:270) at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:169)
您已设置 "key.ignore":"false"
并且仅提到了您要发送到 Kafka 主题的值。
Kafka 记录既有键也有值。如果您不指定键,它将为空。
Elasticsearch 接收器连接器不接受空键,如错误所述
ConnectException: Key is used as document id and can not be null
此外,您只提取转换中的密钥,从未使用 InsertKey
"transforms": "extractKey",
您可以使用独立的 FileSinkConnector 输出到控制台来调试连接器
name=local-console-sink
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
topics=hello7