弹性搜索 - Kafka Connect 未将正确的密钥值发布为文档 ID
Elastic search - Kafka Connect not publishing correct value of key as document id
我正在尝试使用 Kafka connect 将数据从 kafka 主题发布到 Elastic search。
下面是我的配置。
{
"name": "elasticsearch_sink_19",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "k-connect-status",
"connection.url": "http://docker.for.mac.host.internal:9200",
"type.name": "connectstatus",
"behavior.on.malformed.documents": "ignore",
"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"validate.non.null": "false",
"key.ignore":"true",
"schema.ignore":"true",
"value.converter.schemas.enable": "false"
}
}
"key.ignore" true 正在使用错误的 _id(不是预期的 _id)发布数据。
来自主题的数据:
格式:JSON
{
"ROWTIME":1591743374742,
"ROWKEY":"status-connector-elasticsearch_sink_31",
"state":"RUNNING",
"trace":null,
"worker_id":"connect:8083",
"generation":2
}
示例弹性搜索输出:
{
"_index" : "k-connect-status",
"_type" : "connectstatus",
"_id" : "k-connect-status+1+17",
"_score" : 1.0,
"_source" : {
"generation" : 11,
"trace" : null,
"state" : "UNASSIGNED",
"worker_id" : "connect:8083"
}
}
预期弹性搜索输出
{
"_index" : "k-connect-status",
"_type" : "connectstatus",
"_id" : "status-connector-elasticsearch_sink_31",
"_score" : 1.0,
"_source" : {
"generation" : 11,
"trace" : null,
"state" : "UNASSIGNED",
"worker_id" : "connect:8083"
}
}
ROWKEY 作为弹性搜索的_id。至少 ROWKEY 附加主题名称。
"key.ignore" false 未发布任何数据。
"key.ignore":"true",
正在做它应该做的事情。它忽略了 Kafka 消息的键,而是使用主题+分区+偏移量的元组,这是您在引用的 Elasticsearch 输出中可以看到的:
"_id" : "k-connect-status+1+17",
如果要使用Kafka消息的key需要设置"key.ignore":"false"
Elasticsearch Sink Connector 默认情况下通过连接主题、分区和偏移量来创建文档 _id。它发生在你的情况下。
您应该在 false
上设置 key.ignore
并提供正确的密钥。
如果您需要从消息值中提取一些信息并将其放入键中,您应该使用适当的Transformation. I think you can try with ValueToKey。
我正在尝试使用 Kafka connect 将数据从 kafka 主题发布到 Elastic search。 下面是我的配置。
{
"name": "elasticsearch_sink_19",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "k-connect-status",
"connection.url": "http://docker.for.mac.host.internal:9200",
"type.name": "connectstatus",
"behavior.on.malformed.documents": "ignore",
"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"validate.non.null": "false",
"key.ignore":"true",
"schema.ignore":"true",
"value.converter.schemas.enable": "false"
}
}
"key.ignore" true 正在使用错误的 _id(不是预期的 _id)发布数据。
来自主题的数据:
格式:JSON
{
"ROWTIME":1591743374742,
"ROWKEY":"status-connector-elasticsearch_sink_31",
"state":"RUNNING",
"trace":null,
"worker_id":"connect:8083",
"generation":2
}
示例弹性搜索输出:
{
"_index" : "k-connect-status",
"_type" : "connectstatus",
"_id" : "k-connect-status+1+17",
"_score" : 1.0,
"_source" : {
"generation" : 11,
"trace" : null,
"state" : "UNASSIGNED",
"worker_id" : "connect:8083"
}
}
预期弹性搜索输出
{
"_index" : "k-connect-status",
"_type" : "connectstatus",
"_id" : "status-connector-elasticsearch_sink_31",
"_score" : 1.0,
"_source" : {
"generation" : 11,
"trace" : null,
"state" : "UNASSIGNED",
"worker_id" : "connect:8083"
}
}
ROWKEY 作为弹性搜索的_id。至少 ROWKEY 附加主题名称。
"key.ignore" false 未发布任何数据。
"key.ignore":"true",
正在做它应该做的事情。它忽略了 Kafka 消息的键,而是使用主题+分区+偏移量的元组,这是您在引用的 Elasticsearch 输出中可以看到的:
"_id" : "k-connect-status+1+17",
如果要使用Kafka消息的key需要设置"key.ignore":"false"
Elasticsearch Sink Connector 默认情况下通过连接主题、分区和偏移量来创建文档 _id。它发生在你的情况下。
您应该在 false
上设置 key.ignore
并提供正确的密钥。
如果您需要从消息值中提取一些信息并将其放入键中,您应该使用适当的Transformation. I think you can try with ValueToKey。