python 应用中的 KafkaUnavailableError

KafkaUnavailableError in python app

我是 Kafka 的新手,正在跟随 this tutorial 了解一个简单的 python 应用程序。当我 运行 producer.py 时,我得到以下错误

raise KafkaUnavailableError('All servers failed to process request: %s' % hosts)
kafka.errors.KafkaUnavailableError: KafkaUnavailableError: All servers failed to process request: [('localhost', 9092, 0)]

不确定我错过了什么?我确定我通过

启动了 kafka 服务
brew services start kafka

有什么建议吗?下面是我的producer.py

import time
import cv2
from kafka import SimpleProducer, KafkaClient
#  connect to Kafka
kafka = KafkaClient('localhost:9092')
producer = SimpleProducer(kafka)
# Assign a topic
topic = 'my-topic'

def video_emitter(video):
    # Open the video
    video = cv2.VideoCapture(video)
    print(' emitting.....')

    # read the file
    while (video.isOpened):
        # read the image in each frame
        success, image = video.read()
        # check if the file has read to the end
        if not success:
            break
        # convert the image png
        ret, jpeg = cv2.imencode('.png', image)
        # Convert the image to bytes and send to kafka
        producer.send_messages(topic, jpeg.tobytes())
        # To reduce CPU usage create sleep time of 0.2sec  
        time.sleep(0.2)
    # clear the capture
    video.release()
    print('done emitting')

if __name__ == '__main__':
    video_emitter('video.mp4')

您似乎在使用旧版消费者 API,该版本已被弃用。 您可以尝试使用 KafkaProducer 而不是 SimpleProducer

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='localhost:9092')