Kafka Connect(Confluent 5.0、4.1.2 或 3.0)未启动
Kafka Connect (Confluent 5.0, 4.1.2 or 3.0) not starting
我们有一个启用了 SSL 的 Kafka 集群(作为第 3 方托管服务)。我们现在正在尝试使用第 3 方接收器(WePay BigQuery 连接器)设置 Kafka Connect(Confluent 5.0)。当以独立模式启动 Kafka connect 时,一切都很顺利。不幸的是,当启用分布式模式时,Kafka Connect 突然失败并显示以下内容:
[2018-09-25 15:01:46,248] INFO Kafka version : 1.0.0 (org.apache.kafka.common.utils.AppInfoParser:109)
[2018-09-25 15:01:46,248] INFO Kafka commitId : aaa7af6d4a11b29d (org.apache.kafka.common.utils.AppInfoParser:110)
[2018-09-25 15:01:46,667] INFO Kafka cluster ID: Q9PaAEeWSbOavVmHTQS5sA (org.apache.kafka.connect.util.ConnectUtils:59)
[2018-09-25 15:01:46,685] INFO Logging initialized @10512ms to org.eclipse.jetty.util.log.Slf4jLog (org.eclipse.jetty.util.log:193)
[2018-09-25 15:01:46,726] INFO Added connector for http://:8083 (org.apache.kafka.connect.runtime.rest.RestServer:119)
[2018-09-25 15:01:46,760] INFO Advertised URI: http://192.168.4.207:8083/ (org.apache.kafka.connect.runtime.rest.RestServer:267)
[2018-09-25 15:01:46,796] INFO Kafka version : 1.0.0 (org.apache.kafka.common.utils.AppInfoParser:109)
[2018-09-25 15:01:46,796] INFO Kafka commitId : aaa7af6d4a11b29d (org.apache.kafka.common.utils.AppInfoParser:110)
ERROR Stopping due to error
(org.apache.kafka.connect.cli.ConnectDistributed:117)
java.lang.NoSuchMethodError:
org.apache.kafka.common.metrics.Sensor.add
(Lorg/apache/kafka/common/metrics/CompoundStat;)Z
at org.apache.kafka.connect.runtime.Worker$WorkerMetricsGroup.<init> .
(Worker.java:731)
at org.apache.kafka.connect.runtime.Worker.<init>(Worker.java:112)
at
org.apache.kafka.connect.cli.ConnectDistributed.main
(ConnectDistributed.java:88)
已尝试 Google 查找特定错误,但找不到任何内容。它看起来像是某处的版本问题(因此出现 NoSuchMethodError),但不知道从哪里开始。
与 Confluent 4.1.2 一起使用时出现不同的错误:
[2018-09-26 15:14:05,498] ERROR Stopping due to error (org.apache.kafka.connect.cli.ConnectDistributed:112)
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.connect.runtime.distributed.WorkerGroupMember.<init>(WorkerGroupMember.java:144)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.<init>(DistributedHerder.java:182)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.<init>(DistributedHerder.java:159)
at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:95)
Caused by: java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.<init>(Lorg/apache/kafka/common/utils/LogContext;Lorg/apache/kafka/clients/KafkaClient;Lorg/apache/kafka/clients/Metadata;Lorg/apache/kafka/common/utils/Time;JJI)V
at org.apache.kafka.connect.runtime.distributed.WorkerGroupMember.<init>(WorkerGroupMember.java:114)
... 3 more
当我们使用相同但使用 Kafka Connect (Confluent 3.0) 时,会出现不同的错误:
[2018-09-26 10:04:24,588] INFO AvroDataConfig values:
schemas.cache.config = 1000
enhanced.avro.schema.support = false
connect.meta.data = true
(io.confluent.connect.avro.AvroDataConfig:169)
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.kafka.common.utils.AppInfoParser.unregisterAppInfo(Ljava/lang/String;Ljava/lang/String;)V
at org.apache.kafka.connect.runtime.distributed.WorkerGroupMember.stop(WorkerGroupMember.java:194)
at org.apache.kafka.connect.runtime.distributed.WorkerGroupMember.<init>(WorkerGroupMember.java:122)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.<init>(DistributedHerder.java:150)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.<init>(DistributedHerder.java:132)
at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:82)
这是distributed.properties:
bootstrap.servers=*****
group.id=testGroup
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=****
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=****
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statuses
security.protocol=SSL
ssl.truststore.location=truststore.jks
ssl.truststore.password=****
ssl.keystore.type=PKCS12
ssl.keystore.location=keystore.p12
ssl.keystore.password=****
ssl.key.password=****
plugin.path=/*/confluent-5.0.0/share/java
并参考 standalone.properties:
bootstrap.servers=***
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=***
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=***
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=connect.offsets
consumer.security.protocol=SSL
consumer.ssl.truststore.location=truststore.jks
consumer.ssl.truststore.password=***
consumer.ssl.keystore.type=PKCS12
consumer.ssl.keystore.location=keystore.p12
consumer.ssl.keystore.password=***
consumer.ssl.key.password=***
如有任何帮助,我们将不胜感激。
我刚刚发现您必须在 kafka 连接属性文件中为 kafka 客户端配置添加前缀:
https://docs.confluent.io/current/connect/userguide.html#overriding-producer-and-consumer-settings
您的 standalone.properties 确实使用 consumer 进行前缀配置。 :
consumer.security.protocol=SSL
但是你的 distributed.properties 没有 :
security.protocol=SSL
我们有一个启用了 SSL 的 Kafka 集群(作为第 3 方托管服务)。我们现在正在尝试使用第 3 方接收器(WePay BigQuery 连接器)设置 Kafka Connect(Confluent 5.0)。当以独立模式启动 Kafka connect 时,一切都很顺利。不幸的是,当启用分布式模式时,Kafka Connect 突然失败并显示以下内容:
[2018-09-25 15:01:46,248] INFO Kafka version : 1.0.0 (org.apache.kafka.common.utils.AppInfoParser:109)
[2018-09-25 15:01:46,248] INFO Kafka commitId : aaa7af6d4a11b29d (org.apache.kafka.common.utils.AppInfoParser:110)
[2018-09-25 15:01:46,667] INFO Kafka cluster ID: Q9PaAEeWSbOavVmHTQS5sA (org.apache.kafka.connect.util.ConnectUtils:59)
[2018-09-25 15:01:46,685] INFO Logging initialized @10512ms to org.eclipse.jetty.util.log.Slf4jLog (org.eclipse.jetty.util.log:193)
[2018-09-25 15:01:46,726] INFO Added connector for http://:8083 (org.apache.kafka.connect.runtime.rest.RestServer:119)
[2018-09-25 15:01:46,760] INFO Advertised URI: http://192.168.4.207:8083/ (org.apache.kafka.connect.runtime.rest.RestServer:267)
[2018-09-25 15:01:46,796] INFO Kafka version : 1.0.0 (org.apache.kafka.common.utils.AppInfoParser:109)
[2018-09-25 15:01:46,796] INFO Kafka commitId : aaa7af6d4a11b29d (org.apache.kafka.common.utils.AppInfoParser:110)
ERROR Stopping due to error
(org.apache.kafka.connect.cli.ConnectDistributed:117)
java.lang.NoSuchMethodError:
org.apache.kafka.common.metrics.Sensor.add
(Lorg/apache/kafka/common/metrics/CompoundStat;)Z
at org.apache.kafka.connect.runtime.Worker$WorkerMetricsGroup.<init> .
(Worker.java:731)
at org.apache.kafka.connect.runtime.Worker.<init>(Worker.java:112)
at
org.apache.kafka.connect.cli.ConnectDistributed.main
(ConnectDistributed.java:88)
已尝试 Google 查找特定错误,但找不到任何内容。它看起来像是某处的版本问题(因此出现 NoSuchMethodError),但不知道从哪里开始。
与 Confluent 4.1.2 一起使用时出现不同的错误:
[2018-09-26 15:14:05,498] ERROR Stopping due to error (org.apache.kafka.connect.cli.ConnectDistributed:112)
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.connect.runtime.distributed.WorkerGroupMember.<init>(WorkerGroupMember.java:144)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.<init>(DistributedHerder.java:182)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.<init>(DistributedHerder.java:159)
at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:95)
Caused by: java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.<init>(Lorg/apache/kafka/common/utils/LogContext;Lorg/apache/kafka/clients/KafkaClient;Lorg/apache/kafka/clients/Metadata;Lorg/apache/kafka/common/utils/Time;JJI)V
at org.apache.kafka.connect.runtime.distributed.WorkerGroupMember.<init>(WorkerGroupMember.java:114)
... 3 more
当我们使用相同但使用 Kafka Connect (Confluent 3.0) 时,会出现不同的错误:
[2018-09-26 10:04:24,588] INFO AvroDataConfig values:
schemas.cache.config = 1000
enhanced.avro.schema.support = false
connect.meta.data = true
(io.confluent.connect.avro.AvroDataConfig:169)
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.kafka.common.utils.AppInfoParser.unregisterAppInfo(Ljava/lang/String;Ljava/lang/String;)V
at org.apache.kafka.connect.runtime.distributed.WorkerGroupMember.stop(WorkerGroupMember.java:194)
at org.apache.kafka.connect.runtime.distributed.WorkerGroupMember.<init>(WorkerGroupMember.java:122)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.<init>(DistributedHerder.java:150)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.<init>(DistributedHerder.java:132)
at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:82)
这是distributed.properties:
bootstrap.servers=*****
group.id=testGroup
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=****
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=****
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statuses
security.protocol=SSL
ssl.truststore.location=truststore.jks
ssl.truststore.password=****
ssl.keystore.type=PKCS12
ssl.keystore.location=keystore.p12
ssl.keystore.password=****
ssl.key.password=****
plugin.path=/*/confluent-5.0.0/share/java
并参考 standalone.properties:
bootstrap.servers=***
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=***
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=***
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=connect.offsets
consumer.security.protocol=SSL
consumer.ssl.truststore.location=truststore.jks
consumer.ssl.truststore.password=***
consumer.ssl.keystore.type=PKCS12
consumer.ssl.keystore.location=keystore.p12
consumer.ssl.keystore.password=***
consumer.ssl.key.password=***
如有任何帮助,我们将不胜感激。
我刚刚发现您必须在 kafka 连接属性文件中为 kafka 客户端配置添加前缀: https://docs.confluent.io/current/connect/userguide.html#overriding-producer-and-consumer-settings
您的 standalone.properties 确实使用 consumer 进行前缀配置。 :
consumer.security.protocol=SSL
但是你的 distributed.properties 没有 :
security.protocol=SSL