aiokafka ProducerClosed 异步生成消息时出错

aiokafka ProducerClosed error while producing messages asynchronously

我正在使用 aiokafka 异步生成消息。我有一个 Api 使用 django,它正在向 kafka 队列生成消息。它工作正常。现在,当我将相同的 api 转换为使用 aiohttp 服务器时,就会出现以下错误:-

aiokafka.errors.ProducerClosed: ProducerClosed

第一条消息生成成功。上述错误出现在第二次消息生成中。

loop = asyncio.get_event_loop()
producer = AIOKafkaProducer(
    loop=loop,
    bootstrap_servers="127.0.0.1:9092"
)
await producer.start()
response = await producer.send_and_wait(queue_name, msg)
await producer.stop()

aiokafka 文档中没有关于此错误的信息。请帮忙

编辑: 我在处理者中分享这个制作人。如果我让生产者保持开放状态,会导致任何问题吗?生产者什么时候自动关闭?

aiokafka.errors.ProducerClosed: ProducerClosed

将消息发送到已关闭的生产者时出现此错误occurs

如果您在处理程序之间共享生成器,请确保在生成第一条消息后不要关闭它。

编辑:您可以在 cleanup context

中关闭它
async def kafka(app):
    await producer.start()
    yield
    await producer.stop()


app.cleanup_ctx.append(kafka)

没有它,所有连接will try to close