Kafka Connect 无法通过 SSL 读取 Kafka 主题
Kafka Connect failing to read from Kafka topics over SSL
运行 kafka 在我们的 docker-swarm 中连接,使用以下组合文件:
cp-kafka-connect-node:
image: confluentinc/cp-kafka-connect:5.1.0
ports:
- 28085:28085
secrets:
- kafka.truststore.jks
- source: kafka-connect-aws-credentials
target: /root/.aws/credentials
environment:
CONNECT_BOOTSTRAP_SERVERS: kafka01:9093,kafka02:9093,kafka03:9093
CONNECT_LOG4J_ROOT_LEVEL: TRACE
CONNECT_REST_PORT: 28085
CONNECT_GROUP_ID: cp-kafka-connect
CONNECT_CONFIG_STORAGE_TOPIC: dev_cp-kafka-connect-config
CONNECT_OFFSET_STORAGE_TOPIC: dev_cp-kafka-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: dev_cp-kafka-connect-status
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 3
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 3
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 3
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: 'false'
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: 'false'
CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_REST_ADVERTISED_HOST_NAME: localhost
CONNECT_PLUGIN_PATH: /usr/share/java/
CONNECT_SECURITY_PROTOCOL: SSL
CONNECT_SSL_TRUSTSTORE_LOCATION: /run/secrets/kafka.truststore.jks
CONNECT_SSL_TRUSTSTORE_PASSWORD: ********
KAFKA_HEAP_OPTS: '-XX:+UnlockExperimentalVMOptions -XX:+UseCGroupMemoryLimitForHeap -XX:MaxRAMFraction=2'
deploy:
replicas: 1
resources:
limits:
cpus: '0.50'
memory: 4gb
restart_policy:
condition: on-failure
delay: 10s
max_attempts: 3
window: 2000s
secrets:
kafka.truststore.jks:
external: true
kafka-connect-aws-credentials:
external: true
kafka 连接节点启动成功,我可以设置任务并查看这些任务的状态...
我设置的连接器称为 kafka-sink,我使用以下配置创建它:
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"s3.region": "eu-central-1",
"flush.size": "1",
"schema.compatibility": "NONE",
"tasks.max": "1",
"topics": "input-topic-name",
"s3.part.size": "5242880",
"timezone": "UTC",
"directory.delim": "/",
"locale": "UK",
"s3.compression.type": "gzip",
"format.class": "io.confluent.connect.s3.format.bytearray.ByteArrayFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"name": "kafka-sink",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"s3.bucket.name": "my-s3-bucket",
"rotate.schedule.interval.ms": "60000"
}
这个任务现在说它是 运行ning。
当我没有包含 SSL 配置时,具体来说:
CONNECT_BOOTSTRAP_SERVERS: kafka01:9093,kafka02:9093,kafka03:9093
CONNECT_SECURITY_PROTOCOL: SSL
CONNECT_SSL_TRUSTSTORE_LOCATION: /run/secrets/kafka.truststore.jks
CONNECT_SSL_TRUSTSTORE_PASSWORD: ********
而是指向 bootstrap 没有安全保护的服务器:
CONNECT_BOOTSTRAP_SERVERS: insecurekafka:9092
它工作正常,从适当的输入主题中读取,并输出到默认分区的 S3 存储桶...
然而,当我 运行 它针对我的安全 kafka 主题使用 SSL 配置时,它没有记录任何错误,也没有抛出任何异常,但是尽管数据不断被推送到输入主题,它什么也不做。 .
我是不是做错了什么?
这是我第一次使用 Kafka Connect,通常,我使用 Spring 启动应用程序连接到 kafka,您只需在配置中指定信任库位置和密码。
我的撰写文件或任务配置中是否缺少某些配置?
我认为您需要为消费者和生产者添加 SSL 配置。在这里查看 Kafka Connect Encrypt with SSL
像这样
security.protocol=SSL
ssl.truststore.location=~/kafka.truststore.jks
ssl.truststore.password=<password>
ssl.keystore.location=~/kafka.client.keystore.jks
ssl.keystore.password=<password>
ssl.key.password=<password>
producer.security.protocol=SSL
producer.ssl.truststore.location=~/kafka.truststore.jks
producer.ssl.truststore.password=<password>
producer.ssl.keystore.location=~/kafka.client.keystore.jks
producer.ssl.keystore.password=<password>
producer.ssl.key.password=<password>
运行 kafka 在我们的 docker-swarm 中连接,使用以下组合文件:
cp-kafka-connect-node:
image: confluentinc/cp-kafka-connect:5.1.0
ports:
- 28085:28085
secrets:
- kafka.truststore.jks
- source: kafka-connect-aws-credentials
target: /root/.aws/credentials
environment:
CONNECT_BOOTSTRAP_SERVERS: kafka01:9093,kafka02:9093,kafka03:9093
CONNECT_LOG4J_ROOT_LEVEL: TRACE
CONNECT_REST_PORT: 28085
CONNECT_GROUP_ID: cp-kafka-connect
CONNECT_CONFIG_STORAGE_TOPIC: dev_cp-kafka-connect-config
CONNECT_OFFSET_STORAGE_TOPIC: dev_cp-kafka-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: dev_cp-kafka-connect-status
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 3
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 3
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 3
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: 'false'
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: 'false'
CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_REST_ADVERTISED_HOST_NAME: localhost
CONNECT_PLUGIN_PATH: /usr/share/java/
CONNECT_SECURITY_PROTOCOL: SSL
CONNECT_SSL_TRUSTSTORE_LOCATION: /run/secrets/kafka.truststore.jks
CONNECT_SSL_TRUSTSTORE_PASSWORD: ********
KAFKA_HEAP_OPTS: '-XX:+UnlockExperimentalVMOptions -XX:+UseCGroupMemoryLimitForHeap -XX:MaxRAMFraction=2'
deploy:
replicas: 1
resources:
limits:
cpus: '0.50'
memory: 4gb
restart_policy:
condition: on-failure
delay: 10s
max_attempts: 3
window: 2000s
secrets:
kafka.truststore.jks:
external: true
kafka-connect-aws-credentials:
external: true
kafka 连接节点启动成功,我可以设置任务并查看这些任务的状态...
我设置的连接器称为 kafka-sink,我使用以下配置创建它:
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"s3.region": "eu-central-1",
"flush.size": "1",
"schema.compatibility": "NONE",
"tasks.max": "1",
"topics": "input-topic-name",
"s3.part.size": "5242880",
"timezone": "UTC",
"directory.delim": "/",
"locale": "UK",
"s3.compression.type": "gzip",
"format.class": "io.confluent.connect.s3.format.bytearray.ByteArrayFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"name": "kafka-sink",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"s3.bucket.name": "my-s3-bucket",
"rotate.schedule.interval.ms": "60000"
}
这个任务现在说它是 运行ning。
当我没有包含 SSL 配置时,具体来说:
CONNECT_BOOTSTRAP_SERVERS: kafka01:9093,kafka02:9093,kafka03:9093
CONNECT_SECURITY_PROTOCOL: SSL
CONNECT_SSL_TRUSTSTORE_LOCATION: /run/secrets/kafka.truststore.jks
CONNECT_SSL_TRUSTSTORE_PASSWORD: ********
而是指向 bootstrap 没有安全保护的服务器:
CONNECT_BOOTSTRAP_SERVERS: insecurekafka:9092
它工作正常,从适当的输入主题中读取,并输出到默认分区的 S3 存储桶...
然而,当我 运行 它针对我的安全 kafka 主题使用 SSL 配置时,它没有记录任何错误,也没有抛出任何异常,但是尽管数据不断被推送到输入主题,它什么也不做。 .
我是不是做错了什么?
这是我第一次使用 Kafka Connect,通常,我使用 Spring 启动应用程序连接到 kafka,您只需在配置中指定信任库位置和密码。
我的撰写文件或任务配置中是否缺少某些配置?
我认为您需要为消费者和生产者添加 SSL 配置。在这里查看 Kafka Connect Encrypt with SSL 像这样
security.protocol=SSL
ssl.truststore.location=~/kafka.truststore.jks
ssl.truststore.password=<password>
ssl.keystore.location=~/kafka.client.keystore.jks
ssl.keystore.password=<password>
ssl.key.password=<password>
producer.security.protocol=SSL
producer.ssl.truststore.location=~/kafka.truststore.jks
producer.ssl.truststore.password=<password>
producer.ssl.keystore.location=~/kafka.client.keystore.jks
producer.ssl.keystore.password=<password>
producer.ssl.key.password=<password>