kafka.common.KafkaException: 无法将来自 EC2 的 zookeeper 的代理信息解析到弹性搜索
kafka.common.KafkaException: Failed to parse the broker info from zookeeper from EC2 to elastic search
我设置了 aws MSK,我正在尝试将记录从 MSK 接收到弹性搜索。
我能够将数据以 json 格式推送到 MSK。
我想下沉到 elastic search 。
我能够正确地完成所有设置。
这就是我在 EC2 实例上所做的
wget /usr/local http://packages.confluent.io/archive/3.1/confluent-oss-3.1.2-2.11.tar.gz -P ~/Downloads/
tar -zxvf ~/Downloads/confluent-oss-3.1.2-2.11.tar.gz -C ~/Downloads/
sudo mv ~/Downloads/confluent-3.1.2 /usr/local/confluent
/usr/local/confluent/etc/kafka-connect-elasticsearch
之后我修改了 kafka-connect-elasticsearch 并设置了我的弹性搜索 url
name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=AWSKafkaTutorialTopic
key.ignore=true
connection.url=https://search-abcdefg-risdfgdfgk-es-ex675zav7k6mmmqodfgdxxipg5cfsi.us-east-1.es.amazonaws.com
type.name=kafka-connect
生产者发送如下格式的消息
{
"data": {
"RequestID": 517082653,
"ContentTypeID": 9,
"OrgID": 16145,
"UserID": 4,
"PromotionStartDateTime": "2019-12-14T16:06:21Z",
"PromotionEndDateTime": "2019-12-14T16:16:04Z",
"SystemStartDatetime": "2019-12-14T16:17:45.507000000Z"
},
"metadata": {
"timestamp": "2019-12-29T10:37:31.502042Z",
"record-type": "data",
"operation": "insert",
"partition-key-type": "schema-table",
"schema-name": "dbo",
"table-name": "TRFSDIQueue"
}
}
我对 kafka 连接如何从这里开始有点困惑?
如果是,我该如何开始?
我也像下面这样启动了模式注册表,这给了我错误。
/usr/local/confluent/bin/schema-registry-start /usr/local/confluent/etc/schema-registry/schema-registry.properties
当我这样做时,我得到以下错误
[2019-12-29 13:49:17,861] ERROR Server died unexpectedly: (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain:51)
kafka.common.KafkaException: Failed to parse the broker info from zookeeper: {"listener_security_protocol_map":{"CLIENT":"PLAINTEXT","CLIENT_SECURE":"SSL","REPLICATION":"PLAINTEXT","REPLICATION_SECURE":"SSL"},"endpoints":["CLIENT:/
请帮忙。
按照回答中的建议,我升级了 kafka connect 版本,但随后我开始出现以下错误
ERROR Error starting the schema registry (io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication:63)
io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryInitializationException: Error initializing kafka store while initializing schema registry
at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.init(KafkaSchemaRegistry.java:210)
at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.initSchemaRegistry(SchemaRegistryRestApplication.java:61)
at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.setupResources(SchemaRegistryRestApplication.java:72)
at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.setupResources(SchemaRegistryRestApplication.java:39)
at io.confluent.rest.Application.createServer(Application.java:201)
at io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain.main(SchemaRegistryMain.java:41)
Caused by: io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException: Timed out trying to create or validate schema topic configuration
at io.confluent.kafka.schemaregistry.storage.KafkaStore.createOrVerifySchemaTopic(KafkaStore.java:168)
at io.confluent.kafka.schemaregistry.storage.KafkaStore.init(KafkaStore.java:111)
at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.init(KafkaSchemaRegistry.java:208)
... 5 more
Caused by: java.util.concurrent.TimeoutException
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:108)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:274)
at io.confluent.kafka.schemaregistry.storage.KafkaStore.createOrVerifySchemaTopic(KafkaStore.java:161)
... 7 more
首先,Confluent Platform 3.1.2 相当老了。我建议你获取与 Kafka 版本一致的版本
您使用位于 bin 和 etc/kafka 文件夹下的适当 connect-*
脚本和属性启动 Kafka Connect
例如,
/usr/local/confluent/bin/connect-standalone \
/usr/local/confluent/etc/kafka/kafka-connect-standalone.properties \
/usr/local/confluent/etc/kafka-connect-elasticsearch/quickstart.properties
如果可行,您可以继续使用 connect-distributed 命令
关于 Schema Registry,您可以搜索它的 Github 问题,让多个人尝试让 MSK 工作,但根本问题与 MSK 不公开 PLAINTEXT 侦听器和 Schema Registry 不支持命名侦听器有关. (自版本 5.x 以来,这可能已更改)
您也可以尝试在 ECS/EKS 中使用 Connect 和 Schema Registry 容器,而不是在 EC2 机器中提取
我设置了 aws MSK,我正在尝试将记录从 MSK 接收到弹性搜索。 我能够将数据以 json 格式推送到 MSK。 我想下沉到 elastic search 。 我能够正确地完成所有设置。 这就是我在 EC2 实例上所做的
wget /usr/local http://packages.confluent.io/archive/3.1/confluent-oss-3.1.2-2.11.tar.gz -P ~/Downloads/
tar -zxvf ~/Downloads/confluent-oss-3.1.2-2.11.tar.gz -C ~/Downloads/
sudo mv ~/Downloads/confluent-3.1.2 /usr/local/confluent
/usr/local/confluent/etc/kafka-connect-elasticsearch
之后我修改了 kafka-connect-elasticsearch 并设置了我的弹性搜索 url
name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=AWSKafkaTutorialTopic
key.ignore=true
connection.url=https://search-abcdefg-risdfgdfgk-es-ex675zav7k6mmmqodfgdxxipg5cfsi.us-east-1.es.amazonaws.com
type.name=kafka-connect
生产者发送如下格式的消息
{
"data": {
"RequestID": 517082653,
"ContentTypeID": 9,
"OrgID": 16145,
"UserID": 4,
"PromotionStartDateTime": "2019-12-14T16:06:21Z",
"PromotionEndDateTime": "2019-12-14T16:16:04Z",
"SystemStartDatetime": "2019-12-14T16:17:45.507000000Z"
},
"metadata": {
"timestamp": "2019-12-29T10:37:31.502042Z",
"record-type": "data",
"operation": "insert",
"partition-key-type": "schema-table",
"schema-name": "dbo",
"table-name": "TRFSDIQueue"
}
}
我对 kafka 连接如何从这里开始有点困惑? 如果是,我该如何开始?
我也像下面这样启动了模式注册表,这给了我错误。
/usr/local/confluent/bin/schema-registry-start /usr/local/confluent/etc/schema-registry/schema-registry.properties
当我这样做时,我得到以下错误
[2019-12-29 13:49:17,861] ERROR Server died unexpectedly: (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain:51)
kafka.common.KafkaException: Failed to parse the broker info from zookeeper: {"listener_security_protocol_map":{"CLIENT":"PLAINTEXT","CLIENT_SECURE":"SSL","REPLICATION":"PLAINTEXT","REPLICATION_SECURE":"SSL"},"endpoints":["CLIENT:/
请帮忙。
按照回答中的建议,我升级了 kafka connect 版本,但随后我开始出现以下错误
ERROR Error starting the schema registry (io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication:63)
io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryInitializationException: Error initializing kafka store while initializing schema registry
at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.init(KafkaSchemaRegistry.java:210)
at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.initSchemaRegistry(SchemaRegistryRestApplication.java:61)
at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.setupResources(SchemaRegistryRestApplication.java:72)
at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.setupResources(SchemaRegistryRestApplication.java:39)
at io.confluent.rest.Application.createServer(Application.java:201)
at io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain.main(SchemaRegistryMain.java:41)
Caused by: io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException: Timed out trying to create or validate schema topic configuration
at io.confluent.kafka.schemaregistry.storage.KafkaStore.createOrVerifySchemaTopic(KafkaStore.java:168)
at io.confluent.kafka.schemaregistry.storage.KafkaStore.init(KafkaStore.java:111)
at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.init(KafkaSchemaRegistry.java:208)
... 5 more
Caused by: java.util.concurrent.TimeoutException
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:108)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:274)
at io.confluent.kafka.schemaregistry.storage.KafkaStore.createOrVerifySchemaTopic(KafkaStore.java:161)
... 7 more
首先,Confluent Platform 3.1.2 相当老了。我建议你获取与 Kafka 版本一致的版本
您使用位于 bin 和 etc/kafka 文件夹下的适当 connect-*
脚本和属性启动 Kafka Connect
例如,
/usr/local/confluent/bin/connect-standalone \
/usr/local/confluent/etc/kafka/kafka-connect-standalone.properties \
/usr/local/confluent/etc/kafka-connect-elasticsearch/quickstart.properties
如果可行,您可以继续使用 connect-distributed 命令
关于 Schema Registry,您可以搜索它的 Github 问题,让多个人尝试让 MSK 工作,但根本问题与 MSK 不公开 PLAINTEXT 侦听器和 Schema Registry 不支持命名侦听器有关. (自版本 5.x 以来,这可能已更改)
您也可以尝试在 ECS/EKS 中使用 Connect 和 Schema Registry 容器,而不是在 EC2 机器中提取