弹性搜索 - Kafka Connect 未将正确的密钥值发布为文档 ID

Elastic search - Kafka Connect not publishing correct value of key as document id

我正在尝试使用 Kafka connect 将数据从 kafka 主题发布到 Elastic search。 下面是我的配置。

{
  "name": "elasticsearch_sink_19",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "1",
    "topics": "k-connect-status",
    "connection.url": "http://docker.for.mac.host.internal:9200",
    "type.name": "connectstatus",
    "behavior.on.malformed.documents": "ignore",
    "errors.tolerance": "all",
    "errors.log.enable": "true",
    "errors.log.include.messages": "true",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter", 
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "validate.non.null": "false",
    "key.ignore":"true",
    "schema.ignore":"true",
    "value.converter.schemas.enable": "false"
  }
}

"key.ignore" true 正在使用错误的 _id(不是预期的 _id)发布数据。

来自主题的数据:

格式:JSON

{
  "ROWTIME":1591743374742,
  "ROWKEY":"status-connector-elasticsearch_sink_31",
  "state":"RUNNING",
  "trace":null,
  "worker_id":"connect:8083",
  "generation":2
}

示例弹性搜索输出:

{
  "_index" : "k-connect-status",
  "_type" : "connectstatus",
  "_id" : "k-connect-status+1+17",
  "_score" : 1.0,
  "_source" : {
    "generation" : 11,
    "trace" : null,
    "state" : "UNASSIGNED",
    "worker_id" : "connect:8083"
  }
}

预期弹性搜索输出

{
  "_index" : "k-connect-status",
  "_type" : "connectstatus",
  "_id" : "status-connector-elasticsearch_sink_31",
  "_score" : 1.0,
  "_source" : {
    "generation" : 11,
    "trace" : null,
    "state" : "UNASSIGNED",
    "worker_id" : "connect:8083"
  }
}

ROWKEY 作为弹性搜索的_id。至少 ROWKEY 附加主题名称。

"key.ignore" false 未发布任何数据。

"key.ignore":"true", 正在做它应该做的事情。它忽略了 Kafka 消息的键,而是使用主题+分区+偏移量的元组,这是您在引用的 Elasticsearch 输出中可以看到的:

  "_id" : "k-connect-status+1+17",

如果要使用Kafka消息的key需要设置"key.ignore":"false"

查看本教程以了解更多信息并查看键的解释 video / code

Elasticsearch Sink Connector 默认情况下通过连接主题、分区和偏移量来创建文档 _id。它发生在你的情况下。 您应该在 false 上设置 key.ignore 并提供正确的密钥。

如果您需要从消息值中提取一些信息并将其放入键中,您应该使用适当的Transformation. I think you can try with ValueToKey