从另一个容器中的 flink 作业写入 docker 容器中的 kafka 主题
Writing to a kafka topic in a docker container from a flink job in another container
我正在尝试学习 Flink、Docker 和 Kafka,所以我正在尝试设置一个简单的虚拟设置,让 Flink 和 Kafka 从不同的容器进行通信。我在本地完成所有这些工作,但我还需要了解如何使用来自不同位置的容器 运行 来执行此操作,因此我不想为了让它工作而走任何捷径。
我有一个 flink 作业,目前只是 scala 欺诈检测示例的精简版。我在使用 Kafka 和 Flink 运行 的虚拟机上工作,所以这里唯一相关的部分可能是 bootstrap 服务器。以下是作业中整个主要功能的内容:
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val transactions: DataStream[Transaction] = env
.addSource(new TransactionSource)
.name("transactions")
val properties = new Properties
properties.setProperty("bootstrap.servers", "localhost:9092")
val myProducer = new FlinkKafkaProducer[Transaction](
"flagged-transactions", // target topic
TransactionSchema("flagged-transactions"),
properties, // producer config
FlinkKafkaProducer.Semantic.EXACTLY_ONCE) // fault-tolerance
transactions.addSink(myProducer)
env.execute("transactions")
我与 docker network create flink-network
建立了一个 docker 网络。然后我用 docker run -d --rm --name=jobmanager --network flink-network --publish 8081:8081 --env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" flink:latest jobmanager
启动 flink,并用 docker run -d --rm --name=taskmanager --network flink-network --publish 9092:9092 --env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" flink:latest taskmanager
启动任务管理器
目前,我的 docker-compose.yml 启动 kafka 是这样的:
version: '2'
networks:
default:
external: true
name: flink-network
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 29092:29092
command: sh -c "((sleep 15 && kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3 --topic flagged-transactions)&) && /etc/confluent/docker/run "
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
我用docker-compose up -d
启动kafka容器,然后去flink的web界面提交job。它运行,但随后出现异常 Failed to send data to Kafka: Topic flagged-transactions not present in metadata after 60000 ms.
如果我使用 docker exec
访问 kafka,我可以看到我的 flagged-transactions
主题在那里,所以我认为这一定是网络问题。
最初的 docker-compose.yml
来自 here,我在故障排除过程中添加了一些东西。我不太清楚它的很多网络方面,所以很多对我来说都是猜测,copy/pasting 对来自 SO 的各种问题的解决方案。我尝试了 yml 的 environment
部分的一些变体(例如,在许多排列中将 localhost
更改为 kafka
,反之亦然),但是 none他们工作。
有什么建议吗?
乍一看有几点:
公开的端口和通告的侦听器
您正在 docker-compose 上公开端口 29092,该端口配置为“kafka”主机名。如果您愿意,您可能无法通过这种方式从主机与 Kafka 通信。您需要公开 9092:9092(即本地主机),以便客户端(本地主机)的目标名称与端口上的主机名相匹配(如果有意义的话)。您链接的撰写文件公开了 29092,但在“localhost”上正确使用 29092,在“kafka”上正确使用 9092 以使其工作,您已经将其设置为相反的方式。
通过容器名称进行容器间通信
您的工作职能是 运行 在独立的 flink 容器上,尝试连接到 bootstrap-服务器“localhost:9092”,这将无法正常工作访问本身。您需要将其更改为“kafka:29092”,以便它与 Kafka 容器通信。如果你是 运行 主机上的 flink 作业,localhost:9092 就可以。
其他
进行这些更改,看看您是否有运气。您应该能够通过 localhost:9092(使用 Kafka 客户端、portqry、netcat 等)从您的主机查看代理服务器,并通过 kafka:29092(可能使用 netcat 等)从您的 flink 容器查看代理服务器。
如果网桥存在网络问题,请尝试将您的 flink 容器添加到同一个 docker-compose 文件并从 YAML 文件中删除“networks”条目以默认自行启动组网络。只要它们在自己的网络上并正确配置,容器就可以通过容器名称进行通信。
Kafka 侦听器基本上需要与客户端连接的内容相匹配。因此,如果您从您的主机连接到“localhost:9092”,您的 9092 侦听器需要是“localhost”以便预期的名称匹配,否则连接将失败。如果我们通过主机名“kafka”从另一个容器连接到端口 29092,则该侦听器需要配置为“kafka:29092”,依此类推。
如果 none 有效,post Kafka 服务器容器的一些日志,我们将从那里开始。
我正在尝试学习 Flink、Docker 和 Kafka,所以我正在尝试设置一个简单的虚拟设置,让 Flink 和 Kafka 从不同的容器进行通信。我在本地完成所有这些工作,但我还需要了解如何使用来自不同位置的容器 运行 来执行此操作,因此我不想为了让它工作而走任何捷径。
我有一个 flink 作业,目前只是 scala 欺诈检测示例的精简版。我在使用 Kafka 和 Flink 运行 的虚拟机上工作,所以这里唯一相关的部分可能是 bootstrap 服务器。以下是作业中整个主要功能的内容:
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val transactions: DataStream[Transaction] = env
.addSource(new TransactionSource)
.name("transactions")
val properties = new Properties
properties.setProperty("bootstrap.servers", "localhost:9092")
val myProducer = new FlinkKafkaProducer[Transaction](
"flagged-transactions", // target topic
TransactionSchema("flagged-transactions"),
properties, // producer config
FlinkKafkaProducer.Semantic.EXACTLY_ONCE) // fault-tolerance
transactions.addSink(myProducer)
env.execute("transactions")
我与 docker network create flink-network
建立了一个 docker 网络。然后我用 docker run -d --rm --name=jobmanager --network flink-network --publish 8081:8081 --env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" flink:latest jobmanager
启动 flink,并用 docker run -d --rm --name=taskmanager --network flink-network --publish 9092:9092 --env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" flink:latest taskmanager
目前,我的 docker-compose.yml 启动 kafka 是这样的:
version: '2'
networks:
default:
external: true
name: flink-network
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 29092:29092
command: sh -c "((sleep 15 && kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3 --topic flagged-transactions)&) && /etc/confluent/docker/run "
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
我用docker-compose up -d
启动kafka容器,然后去flink的web界面提交job。它运行,但随后出现异常 Failed to send data to Kafka: Topic flagged-transactions not present in metadata after 60000 ms.
如果我使用 docker exec
访问 kafka,我可以看到我的 flagged-transactions
主题在那里,所以我认为这一定是网络问题。
最初的 docker-compose.yml
来自 here,我在故障排除过程中添加了一些东西。我不太清楚它的很多网络方面,所以很多对我来说都是猜测,copy/pasting 对来自 SO 的各种问题的解决方案。我尝试了 yml 的 environment
部分的一些变体(例如,在许多排列中将 localhost
更改为 kafka
,反之亦然),但是 none他们工作。
有什么建议吗?
乍一看有几点:
公开的端口和通告的侦听器
您正在 docker-compose 上公开端口 29092,该端口配置为“kafka”主机名。如果您愿意,您可能无法通过这种方式从主机与 Kafka 通信。您需要公开 9092:9092(即本地主机),以便客户端(本地主机)的目标名称与端口上的主机名相匹配(如果有意义的话)。您链接的撰写文件公开了 29092,但在“localhost”上正确使用 29092,在“kafka”上正确使用 9092 以使其工作,您已经将其设置为相反的方式。
通过容器名称进行容器间通信
您的工作职能是 运行 在独立的 flink 容器上,尝试连接到 bootstrap-服务器“localhost:9092”,这将无法正常工作访问本身。您需要将其更改为“kafka:29092”,以便它与 Kafka 容器通信。如果你是 运行 主机上的 flink 作业,localhost:9092 就可以。
其他
进行这些更改,看看您是否有运气。您应该能够通过 localhost:9092(使用 Kafka 客户端、portqry、netcat 等)从您的主机查看代理服务器,并通过 kafka:29092(可能使用 netcat 等)从您的 flink 容器查看代理服务器。
如果网桥存在网络问题,请尝试将您的 flink 容器添加到同一个 docker-compose 文件并从 YAML 文件中删除“networks”条目以默认自行启动组网络。只要它们在自己的网络上并正确配置,容器就可以通过容器名称进行通信。
Kafka 侦听器基本上需要与客户端连接的内容相匹配。因此,如果您从您的主机连接到“localhost:9092”,您的 9092 侦听器需要是“localhost”以便预期的名称匹配,否则连接将失败。如果我们通过主机名“kafka”从另一个容器连接到端口 29092,则该侦听器需要配置为“kafka:29092”,依此类推。
如果 none 有效,post Kafka 服务器容器的一些日志,我们将从那里开始。