Python、Kafka 和 Docker - KafkaConsumer 一直挂起
Python, Kafka and Docker - KafkaConsumer keeps hanging
我有以下 docker-compose 文件:
version: '3.1'
services:
postgres_db:
image: postgres
restart: always
environment:
POSTGRES_USER: admin
POSTGRES_PASSWORD: admin
POSTGRES_DB: default_db
ports:
- 54320:5432
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "test:1:1"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
使用 docker-compose up
运行 后 - 从终端输出看一切正常。
我启动 python 控制台并 运行 以下行:
kp = KafkaProducer(bootstrap_servers=['localhost:9092'],api_version=(0,10),
value_serializer=lambda x:
dumps(x).encode('utf-8'))
kc = KafkaConsumer('test', bootstrap_servers=['localhost:9092'],api_version=(0,10),group_id=None,auto_offset_reset='earliest',
value_deserializer=lambda json_data: json.loads(json_data.decode('utf-8')))
data = {"test":"test"}
kp.send(topic="test",value=data)
for message in kc:
print(message.value)
但是在 运行 执行此操作后,控制台只是挂起,并且消息看起来不像是 consumed/produced。任何想法这里出了什么问题?谢谢!
要么您需要 运行 您的 Python 容器中的代码并设置
bootstrap_servers=['kafka:9092']
或者您需要将 Kafka 通告给主机上的客户端
KAFKA_ADVERTISED_HOST_NAME: localhost
您也可以阅读有关 HOSTNAME_COMMAND
用法的 wurstmeister README
我还建议 运行在测试生产者和消费者时将它们分开
我有以下 docker-compose 文件:
version: '3.1'
services:
postgres_db:
image: postgres
restart: always
environment:
POSTGRES_USER: admin
POSTGRES_PASSWORD: admin
POSTGRES_DB: default_db
ports:
- 54320:5432
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "test:1:1"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
使用 docker-compose up
运行 后 - 从终端输出看一切正常。
我启动 python 控制台并 运行 以下行:
kp = KafkaProducer(bootstrap_servers=['localhost:9092'],api_version=(0,10),
value_serializer=lambda x:
dumps(x).encode('utf-8'))
kc = KafkaConsumer('test', bootstrap_servers=['localhost:9092'],api_version=(0,10),group_id=None,auto_offset_reset='earliest',
value_deserializer=lambda json_data: json.loads(json_data.decode('utf-8')))
data = {"test":"test"}
kp.send(topic="test",value=data)
for message in kc:
print(message.value)
但是在 运行 执行此操作后,控制台只是挂起,并且消息看起来不像是 consumed/produced。任何想法这里出了什么问题?谢谢!
要么您需要 运行 您的 Python 容器中的代码并设置
bootstrap_servers=['kafka:9092']
或者您需要将 Kafka 通告给主机上的客户端
KAFKA_ADVERTISED_HOST_NAME: localhost
您也可以阅读有关 HOSTNAME_COMMAND
用法的 wurstmeister README
我还建议 运行在测试生产者和消费者时将它们分开