aiokafka ProducerClosed 异步生成消息时出错
aiokafka ProducerClosed error while producing messages asynchronously
我正在使用 aiokafka 异步生成消息。我有一个 Api 使用 django,它正在向 kafka 队列生成消息。它工作正常。现在,当我将相同的 api 转换为使用 aiohttp 服务器时,就会出现以下错误:-
aiokafka.errors.ProducerClosed: ProducerClosed
第一条消息生成成功。上述错误出现在第二次消息生成中。
loop = asyncio.get_event_loop()
producer = AIOKafkaProducer(
loop=loop,
bootstrap_servers="127.0.0.1:9092"
)
await producer.start()
response = await producer.send_and_wait(queue_name, msg)
await producer.stop()
aiokafka 文档中没有关于此错误的信息。请帮忙
编辑:
我在处理者中分享这个制作人。如果我让生产者保持开放状态,会导致任何问题吗?生产者什么时候自动关闭?
aiokafka.errors.ProducerClosed: ProducerClosed
将消息发送到已关闭的生产者时出现此错误occurs。
如果您在处理程序之间共享生成器,请确保在生成第一条消息后不要关闭它。
编辑:您可以在 cleanup context
中关闭它
async def kafka(app):
await producer.start()
yield
await producer.stop()
app.cleanup_ctx.append(kafka)
没有它,所有连接will try to close
我正在使用 aiokafka 异步生成消息。我有一个 Api 使用 django,它正在向 kafka 队列生成消息。它工作正常。现在,当我将相同的 api 转换为使用 aiohttp 服务器时,就会出现以下错误:-
aiokafka.errors.ProducerClosed: ProducerClosed
第一条消息生成成功。上述错误出现在第二次消息生成中。
loop = asyncio.get_event_loop()
producer = AIOKafkaProducer(
loop=loop,
bootstrap_servers="127.0.0.1:9092"
)
await producer.start()
response = await producer.send_and_wait(queue_name, msg)
await producer.stop()
aiokafka 文档中没有关于此错误的信息。请帮忙
编辑: 我在处理者中分享这个制作人。如果我让生产者保持开放状态,会导致任何问题吗?生产者什么时候自动关闭?
aiokafka.errors.ProducerClosed: ProducerClosed
将消息发送到已关闭的生产者时出现此错误occurs。
如果您在处理程序之间共享生成器,请确保在生成第一条消息后不要关闭它。
编辑:您可以在 cleanup context
中关闭它async def kafka(app):
await producer.start()
yield
await producer.stop()
app.cleanup_ctx.append(kafka)
没有它,所有连接will try to close