尝试使用 Kafka Connect 在 Elasticsearch 中索引 kafka 主题
Trying to index kafka topic in Elasticsearch with Kafka Connect
我想将 avro 中的 kafka 主题索引到 elasticsearch 格式,但是
我的时间戳字段无法被识别
elasticsearch 作为日期格式字段。
我对连接器使用了以下配置。
{
"name": "es-sink-barchart-10",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter.schema.registry.url": "http://localhost:8081",
"connection.url": "http://localhost:9200",
"type.name":"type.name=kafka-connect",
"topics": "exchange_avro_01",
"topic.index.map": "exchange_avro_01:exchange_barchart",
"key.ignore": "true"
}
}
原始字段是bigint类型,我希望目标字段是date类型,使用elasticsearch任何有效格式。我已经定义了一个动态模板来尝试通过以下方式解决它:
curl -XPUT "http://localhost:9200/_template/kafkaconnect/" -H 'Content-Type: application/json' -d'
{
"index_patterns": "exchange*",
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0
},
"mappings": {
"kafka-connect": {
"dynamic_templates": [
{
"dates": {
"match_mapping_type": "long",
"match": "TIME",
"mapping": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss"
}
}
}
]
,
"properties": {
"CLOSE": {
"type": "double"
},
.
.
.
}
}
}
}
}'
当我加载上述连接器时,没有任何内容被索引到 elasticsearch。
有什么帮助吗?
如果您的来源是 bigint,那么大概是一个纪元。如果它是一个纪元,那么这将不起作用:
"mapping": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss"
}
因为您告诉 Elasticsearch 日期格式是 yyyy-MM-dd HH:mm:ss
(实际上不是)。
所以,试试这个(暂时省略您的自定义映射;先让它工作,然后再添加回去):
{
"index_patterns": "exchange*",
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0
},
"mappings": {
"kafka-connect": {
"dynamic_templates": [
{
"dates": {
"match": "TIME",
"mapping": {
"type": "date"
} } } ] } } }
nothing is indexed to elasticsearch.
检查 Kafka Connect worker 日志和 Elasticsearch 日志是否有任何错误。
我想将 avro 中的 kafka 主题索引到 elasticsearch 格式,但是 我的时间戳字段无法被识别 elasticsearch 作为日期格式字段。
我对连接器使用了以下配置。
{
"name": "es-sink-barchart-10",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter.schema.registry.url": "http://localhost:8081",
"connection.url": "http://localhost:9200",
"type.name":"type.name=kafka-connect",
"topics": "exchange_avro_01",
"topic.index.map": "exchange_avro_01:exchange_barchart",
"key.ignore": "true"
}
}
原始字段是bigint类型,我希望目标字段是date类型,使用elasticsearch任何有效格式。我已经定义了一个动态模板来尝试通过以下方式解决它:
curl -XPUT "http://localhost:9200/_template/kafkaconnect/" -H 'Content-Type: application/json' -d'
{
"index_patterns": "exchange*",
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0
},
"mappings": {
"kafka-connect": {
"dynamic_templates": [
{
"dates": {
"match_mapping_type": "long",
"match": "TIME",
"mapping": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss"
}
}
}
]
,
"properties": {
"CLOSE": {
"type": "double"
},
.
.
.
}
}
}
}
}'
当我加载上述连接器时,没有任何内容被索引到 elasticsearch。
有什么帮助吗?
如果您的来源是 bigint,那么大概是一个纪元。如果它是一个纪元,那么这将不起作用:
"mapping": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss"
}
因为您告诉 Elasticsearch 日期格式是 yyyy-MM-dd HH:mm:ss
(实际上不是)。
所以,试试这个(暂时省略您的自定义映射;先让它工作,然后再添加回去):
{
"index_patterns": "exchange*",
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0
},
"mappings": {
"kafka-connect": {
"dynamic_templates": [
{
"dates": {
"match": "TIME",
"mapping": {
"type": "date"
} } } ] } } }
nothing is indexed to elasticsearch.
检查 Kafka Connect worker 日志和 Elasticsearch 日志是否有任何错误。