How to solve kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs?

How to solve kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs?

我的docker-compose.yml代码是:

version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

我的 docker 文件是

FROM python
MAINTAINER Shubham Joshi 
ADD hello.py /
ADD transactions_producer.py /
COPY requirements.txt .
RUN pip3 install -r requirements.txt
CMD ["python","./transactions_producer.py"]

python producer.py 的代码是:

from kafka import KafkaProducer
from time import sleep
from kafka.errors import KafkaError
import json

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],api_version=(0, 10, 0),value_serializer=lambda v: json.dumps(v).encode('utf-8'))


for i in range(10):
    producer.send('shubham', create_random_transaction())
    sleep(5)
    print("Success",i)

producer.close()

shubham 是我创建的主题,但我无法 produce/publish 主题中的消息并收到此超时错误。

第 1 步:创建图像:

docker build -t python_producer2 .

搭建成功,我运行

step 2:docker run python_producer2

这是我收到此错误的地方

Traceback (most recent call last):
  File "./transactions_producer.py", line 52, in <module>
    producer.send('shubham', create_random_transaction())
  File "/usr/local/lib/python3.8/site-packages/kafka/producer/kafka.py", line 564, in send
    self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0)
  File "/usr/local/lib/python3.8/site-packages/kafka/producer/kafka.py", line 690, in _wait_on_metadata
    raise Errors.KafkaTimeoutError(
kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.

当您的代码在 Docker 中运行时,localhost 表示 Docker 容器本身。这就是连接超时的原因,因为 Kafka 不在容器上 运行。

首先要做的是让您的容器与代理位于同一 Docker 网络中。要么将其添加到您的 Docker Compose 中,要么在执行 docker run 时指定 Docker Compose 的网络。前者可能更容易:

version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  client:
    image: python_producer2
    container_name: python_producer2
    depends_on: 
      - kafka

但是,因为你的客户端连接到不同服务器上的 Kafka(因为容器实际上是独立的服务器),它不能使用 localhost,它也不能连接到将 return localhost 作为 advertised.listener 的代理上的侦听器。因此,将您的代码更改为:

producer = KafkaProducer(bootstrap_servers=['kafka:29092'],api_version=(0, 10, 0),value_serializer=lambda v: json.dumps(v).encode('utf-8'))

这将告诉它在 kafka:29092 上找到代理,并在 29092 上找到将 kafka:29092 公开为 advertised.listener 的侦听器。要了解有关广告听众的更多信息,请参阅 https://rmoff.net/2018/08/02/kafka-listeners-explained/