将弹性搜索与 Kafka Connect 连接时如何解决 "schema.registry.url" 中的问题?

How to solve issue in "schema.registry.url" while connecting elastic search with Kafka Connect?

我正在尝试将 Kafka Connect 与弹性搜索接收器连接起来。我不是在融合模式下使用,而是在独立模式下使用。这是我的 elasticsearch 连接器配置。

name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=mysql-jdbc-mall
key.ignore=true
schema.ignore=true
connection.url=http://172.**.*.**:5601
type.name=kafka-connect
elastic.security.protocol=SSL
key.converter.schemas.enable=false
value.converter.schemas.enable=false

我的连接-standalone.properties 是

bootstrap.servers=Ni****ing:9092
key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/usr/share/java

当我 运行 连接器时,我遇到了问题。

[2020-01-21 09:31:03,676] ERROR Failed to start task elasticsearch-sink-0 (org.apache.kafka.connect.runtime.Worker:464)
io.confluent.common.config.ConfigException: Missing required configuration "schema.registry.url" which has no default value.
        at io.confluent.common.config.ConfigDef.parse(ConfigDef.java:243)
        at io.confluent.common.config.AbstractConfig.<init>(AbstractConfig.java:78)
        at io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.<init>(AbstractKafkaAvroSerDeConfig.java:100)
        at io.confluent.connect.avro.AvroConverterConfig.<init>(AvroConverterConfig.java:27)
        at io.confluent.connect.avro.AvroConverter.configure(AvroConverter.java:58)
        at org.apache.kafka.connect.runtime.isolation.Plugins.newConverter(Plugins.java:268)
        at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:440)
        at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.createConnectorTasks(StandaloneHerder.java:311)
        at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.updateConnectorTasks(StandaloneHerder.java:336)
        at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:214)
        at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:115)
[2020-01-21 09:31:03,677] INFO Created connector elasticsearch-sink (org.apache.kafka.connect.cli.ConnectStandalone:112)

更新

因为我没有“/etc/schema-registry”文件。我将连接-standalone.properties 更改为

bootstrap.servers=Nifi-Staging:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/usr/share/java

但是当我使用 JSONConverter 时出现此错误。

[2020-01-21 16:12:04,939] ERROR WorkerSinkTask{id=elasticsearch-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
java.lang.NullPointerException
        at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.getServerVersion(JestElasticsearchClient.java:231)
        at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.<init>(JestElasticsearchClient.java:142)
        at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.<init>(JestElasticsearchClient.java:133)
        at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.start(ElasticsearchSinkTask.java:122)
        at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.start(ElasticsearchSinkTask.java:51)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:300)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:189)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
[2020-01-21 16:12:04,946] ERROR WorkerSinkTask{id=elasticsearch-sink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:180)
[2020-01-21 16:12:04,946] INFO Stopping ElasticsearchSinkTask (io.confluent.connect.elasticsearch.ElasticsearchSinkTask:190)

io.confluent.connect.avro.AvroConverter 需要 来定义 schema.registry.url

删除两个 schemas.enable 属性,因为它们只适用于 JSON ; Avro 总是有一个架构,然后改为添加 URL

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://...
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://...

您可以在 etc/schema-registry 文件夹下找到示例连接 属性 文件

如果您不使用 Avro,请更改转换器以匹配您的数据。 key和value也可以是完全不同的类型


另外,elasticsearchurl应该不一样;例如端口 9200 上的 运行,而不是 5601

上的 kibana

I am not using in confluent but in standalone mode.

我假设你的意思是 confluent 命令?那只是为你运行 kafka-connect-distributed,分布式模式实际上是首选。