在 Docker 环境中启动分布式 Kafka 连接后创建 Kafka 连接器

Creating Kafka connector after distributed Kafka connect is started in Docker environment

我正在尝试在执行 connect-distributed 命令后创建一个 kafka 连接器。我写了一个 entrypoint.sh 脚本并将其与 CMD 一起使用。我有这样的 docker 文件:

FROM confluentinc/cp-kafka
RUN mkdir /plugins
RUN mkdir /config
COPY kafka-connect-couchbase-*.jar /plugins/
COPY config /config/
RUN chmod +x /config/stage/entrypoint.sh
ENV EXPOSED_PORT 8083
CMD /config/stage/entrypoint.sh

我有入口点脚本文件:

connect-distributed config/"${DEPLOY_ENV}"/connect-distributed.properties
curl -X POST -H "Content-Type: application/json" -d @config.json http://localhost:8083/connectors

deploy_env 无关紧要,它来自詹金斯。配置文件和distributed.properties也是无关紧要的,是正确的,我手动试过了。

Kafka 连接启动没有问题, 但是 curl 创建连接器的命令没有效果。

简而言之,我想在connect-distributed启动后创建一个连接器,而不在容器外执行任何rest请求。我该如何实现?

您需要确保您正在等待 Kafka Connect worker 完全启动。

顺便说一句,你最好从 Kafka Connect 基础镜像开始

FROM confluentinc/cp-kafka-connect-base:5.5.0

通常您会使用 Confluent Hub 安装连接器,但 Couchbase 似乎不存在,因此您必须像之前那样复制到 JAR 中。

在 Connect 图像中启动 Kafka Connect 的实际脚本是 /etc/confluent/docker/run,因此您的 /config/stage/entrypoint.sh 应该如下所示:

# Launch the worker
/etc/confluent/docker/run &

# Wait for it to start running
# Change the port here if not using the default
bash -c ' \
echo -e "\n\n=============\nWaiting for Kafka Connect to start listening on localhost ⏳\n=============\n"
while [ $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) -ne 200 ] ; do
  echo -e "\t" $(date) " Kafka Connect listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) " (waiting for 200)"
  sleep 5
done
echo -e $(date) "\n\n--------------\n\o/ Kafka Connect is ready! Listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) "\n--------------\n"

# Now create your connector
## Inline config example: 
curl -i -X PUT -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-file-jsonschema-as-json/config \
    -d '{
            "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
            "key.converter": "org.apache.kafka.connect.storage.StringConverter",
            "value.converter": "org.apache.kafka.connect.json.JsonConverter",
            "tasks.max": 1,
            "file": "/jsonschema-as-json.txt",
            "topics": "test-jsonschema"
}'
## External file example: 
curl -X POST -H "Content-Type: application/json" -d @config.json http://localhost:8083/connectors

另见 https://rmoff.net/2018/12/15/docker-tips-and-tricks-with-ksql-and-kafka/

多亏了 Robin Moffatt 的绝妙解决方案,我将其与自己的需求相结合并且成功了。

由于我将图像部署到 kubernetes,/etc/confluent/docker/run & 后台命令导致容器进入完成状态而不是 运行ning。这使得容器无法通过 Rest 接口从外部访问,如下所示:

http://some-ip:31682/connectors

为了解决这个问题,我在原始问题中使用了 Dockerfile,但是通过删除 confluent docker 运行 命令并添加额外的 if 来预先检查连接器是否存在来修改 Robin 的脚本。

bash -c ' \
echo -e "\n\n=============\nWaiting for Kafka Connect to start listening on localhost ⏳\n=============\n"
while [ $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) -ne 200 ] ; do
  echo -e "\t" $(date) " Kafka Connect listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) " (waiting for 200)"
  sleep 5
done
echo -e $(date) "\n\n--------------\n\o/ Kafka Connect is ready! Listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) "\n--------------\n"

if [ $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors/cbconnector2) -ne 200 ]
then
  curl -X POST -H "Content-Type: application/json" -d @config/stage/config.json http://localhost:8083/connectors
fi'

之后,我修改了 Kubernetes 部署文件,添加 post 启动生命周期并将 entrypoint.sh 脚本作为要执行的命令,如下所示:

lifecycle:
  postStart:
    exec:
      command: ["/bin/sh", "/config/stage/entrypoint.sh"] 

基本上,它首先启动 kafka-connect,在启动 process(pod) 之后,我只需执行自定义 shell 脚本来创建 kafka 连接器。

希望这对任何有类似使用场景的人有所帮助。我也对其他(更好的)解决方案想法持开放态度。非常感谢 Robin Moffatt。