使用 kafka-connect-elasticsearch + timestamp SMT,Elasticsearch sink 只获取新消息而不是前一个消息
Elasticsearch sink only get new messages and not the previous one using kafka-connect-elasticsearch + timestamp SMT
我正在使用 kafka-connect-elasticsearch 插件从我的 kafka 获取消息到 Elasticsearch。
我在 kafka 中的数据包含一个日期字段(时间戳格式)。
我的第一个问题是,当我使用这个插件时,Elasticsearch 索引没有将日期字段识别为日期类型,而是一个 long ...
我在我的连接器配置中使用 SMT 转换解决了这个问题。
这是我当前允许在 Elastic 中推送数据的配置:
{
"name": "elasticsearch-sink-test",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "test.test",
"key.ignore": "true",
"connection.url": "http://localhost:9200",
"type.name": "kafka-connect",
"name": "elasticsearch-sink-test",
"Batch.size": 100,
"max.buffered.records": 1000,
"Max.retries": 10,
"Retry.backoff.ms": 1000,
"flush.timeout.ms": 20000,
"max.in.flight.requests": 3
"transforms": "date",
"transforms.date.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.date.target.type": "Date",
"transforms.date.field": "date",
"transforms.date.format": "yyyy-MM-dd HH:mm:ss"
}
}
我现在的问题是:
Elasticsearch 不会获取存储在 kafka 中的所有先前消息,而只会获取新消息(在我启动 Elasticsearch 连接器后推送到 kafka 的所有新消息)。
如何配置连接器使弹性获取所有消息?
make elastic "understand" 日期字段是时间戳是否有任何解决方法?
(有关信息,我的数据源是带有 debezium CDC 连接器的 MongoDB)
提前致谢
How can I configure the connector to make elastic get all the messages?
就像普通的 Kafka 消费者一样,您需要将偏移量设置为最早
consumer.auto.offset.reset=earliest
Is there any workaround the make elastic "understand" that the date field is a timestamp ?
是的,在 Elasticsearch 中使用索引或动态映射。默认情况下,所有摄取的数字都只是数值。只有格式正确的日期字符串才真正被索引为日期。如果您不控制 Elasticsearch 服务器或索引设置,这通常是由该系统的管理员设置的
我正在使用 kafka-connect-elasticsearch 插件从我的 kafka 获取消息到 Elasticsearch。 我在 kafka 中的数据包含一个日期字段(时间戳格式)。
我的第一个问题是,当我使用这个插件时,Elasticsearch 索引没有将日期字段识别为日期类型,而是一个 long ... 我在我的连接器配置中使用 SMT 转换解决了这个问题。
这是我当前允许在 Elastic 中推送数据的配置:
{
"name": "elasticsearch-sink-test",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "test.test",
"key.ignore": "true",
"connection.url": "http://localhost:9200",
"type.name": "kafka-connect",
"name": "elasticsearch-sink-test",
"Batch.size": 100,
"max.buffered.records": 1000,
"Max.retries": 10,
"Retry.backoff.ms": 1000,
"flush.timeout.ms": 20000,
"max.in.flight.requests": 3
"transforms": "date",
"transforms.date.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.date.target.type": "Date",
"transforms.date.field": "date",
"transforms.date.format": "yyyy-MM-dd HH:mm:ss"
}
}
我现在的问题是: Elasticsearch 不会获取存储在 kafka 中的所有先前消息,而只会获取新消息(在我启动 Elasticsearch 连接器后推送到 kafka 的所有新消息)。
如何配置连接器使弹性获取所有消息? make elastic "understand" 日期字段是时间戳是否有任何解决方法?
(有关信息,我的数据源是带有 debezium CDC 连接器的 MongoDB)
提前致谢
How can I configure the connector to make elastic get all the messages?
就像普通的 Kafka 消费者一样,您需要将偏移量设置为最早
consumer.auto.offset.reset=earliest
Is there any workaround the make elastic "understand" that the date field is a timestamp ?
是的,在 Elasticsearch 中使用索引或动态映射。默认情况下,所有摄取的数字都只是数值。只有格式正确的日期字符串才真正被索引为日期。如果您不控制 Elasticsearch 服务器或索引设置,这通常是由该系统的管理员设置的