How to solve kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs?
How to solve kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs?
我的docker-compose.yml代码是:
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
我的 docker 文件是
FROM python
MAINTAINER Shubham Joshi
ADD hello.py /
ADD transactions_producer.py /
COPY requirements.txt .
RUN pip3 install -r requirements.txt
CMD ["python","./transactions_producer.py"]
python producer.py 的代码是:
from kafka import KafkaProducer
from time import sleep
from kafka.errors import KafkaError
import json
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],api_version=(0, 10, 0),value_serializer=lambda v: json.dumps(v).encode('utf-8'))
for i in range(10):
producer.send('shubham', create_random_transaction())
sleep(5)
print("Success",i)
producer.close()
shubham 是我创建的主题,但我无法 produce/publish 主题中的消息并收到此超时错误。
第 1 步:创建图像:
docker build -t python_producer2 .
搭建成功,我运行
step 2:docker run python_producer2
这是我收到此错误的地方
Traceback (most recent call last):
File "./transactions_producer.py", line 52, in <module>
producer.send('shubham', create_random_transaction())
File "/usr/local/lib/python3.8/site-packages/kafka/producer/kafka.py", line 564, in send
self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0)
File "/usr/local/lib/python3.8/site-packages/kafka/producer/kafka.py", line 690, in _wait_on_metadata
raise Errors.KafkaTimeoutError(
kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.
当您的代码在 Docker 中运行时,localhost
表示 Docker 容器本身。这就是连接超时的原因,因为 Kafka 不在容器上 运行。
首先要做的是让您的容器与代理位于同一 Docker 网络中。要么将其添加到您的 Docker Compose 中,要么在执行 docker run
时指定 Docker Compose 的网络。前者可能更容易:
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
client:
image: python_producer2
container_name: python_producer2
depends_on:
- kafka
但是,因为你的客户端连接到不同服务器上的 Kafka(因为容器实际上是独立的服务器),它不能使用 localhost
,它也不能连接到将 return localhost
作为 advertised.listener
的代理上的侦听器。因此,将您的代码更改为:
producer = KafkaProducer(bootstrap_servers=['kafka:29092'],api_version=(0, 10, 0),value_serializer=lambda v: json.dumps(v).encode('utf-8'))
这将告诉它在 kafka:29092
上找到代理,并在 29092
上找到将 kafka:29092
公开为 advertised.listener 的侦听器。要了解有关广告听众的更多信息,请参阅 https://rmoff.net/2018/08/02/kafka-listeners-explained/
我的docker-compose.yml代码是:
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
我的 docker 文件是
FROM python
MAINTAINER Shubham Joshi
ADD hello.py /
ADD transactions_producer.py /
COPY requirements.txt .
RUN pip3 install -r requirements.txt
CMD ["python","./transactions_producer.py"]
python producer.py 的代码是:
from kafka import KafkaProducer
from time import sleep
from kafka.errors import KafkaError
import json
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],api_version=(0, 10, 0),value_serializer=lambda v: json.dumps(v).encode('utf-8'))
for i in range(10):
producer.send('shubham', create_random_transaction())
sleep(5)
print("Success",i)
producer.close()
shubham 是我创建的主题,但我无法 produce/publish 主题中的消息并收到此超时错误。
第 1 步:创建图像:
docker build -t python_producer2 .
搭建成功,我运行
step 2:docker run python_producer2
这是我收到此错误的地方
Traceback (most recent call last):
File "./transactions_producer.py", line 52, in <module>
producer.send('shubham', create_random_transaction())
File "/usr/local/lib/python3.8/site-packages/kafka/producer/kafka.py", line 564, in send
self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0)
File "/usr/local/lib/python3.8/site-packages/kafka/producer/kafka.py", line 690, in _wait_on_metadata
raise Errors.KafkaTimeoutError(
kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.
当您的代码在 Docker 中运行时,localhost
表示 Docker 容器本身。这就是连接超时的原因,因为 Kafka 不在容器上 运行。
首先要做的是让您的容器与代理位于同一 Docker 网络中。要么将其添加到您的 Docker Compose 中,要么在执行 docker run
时指定 Docker Compose 的网络。前者可能更容易:
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
client:
image: python_producer2
container_name: python_producer2
depends_on:
- kafka
但是,因为你的客户端连接到不同服务器上的 Kafka(因为容器实际上是独立的服务器),它不能使用 localhost
,它也不能连接到将 return localhost
作为 advertised.listener
的代理上的侦听器。因此,将您的代码更改为:
producer = KafkaProducer(bootstrap_servers=['kafka:29092'],api_version=(0, 10, 0),value_serializer=lambda v: json.dumps(v).encode('utf-8'))
这将告诉它在 kafka:29092
上找到代理,并在 29092
上找到将 kafka:29092
公开为 advertised.listener 的侦听器。要了解有关广告听众的更多信息,请参阅 https://rmoff.net/2018/08/02/kafka-listeners-explained/