在 Asyncio 中兼顾生产者和消费者 Python
Juggle Producer and Consumer in Asyncio Python
我有一个 生产者 和一个 消费者 异步函数。首先是生产者 运行 ,然后是三个消费者同时消费任务。我想无限期地继续这个节奏。
- 停止 生产者 并向 消费者 发送信号以启动 运行ning 的最佳方法是什么?
- 停止 消费者 并向 生产者 发送信号以启动 运行ning 的最佳方法是什么?
我当前的设置如下所示:
import asyncio
import time
async def producer(event):
n = 0
while True:
print("Running producer...")
await asyncio.sleep(0.5)
n += 1
if n == 2:
event.set()
break
async def consumer(event):
await event.wait()
print("Running consumer...")
await asyncio.sleep(0.5)
async def main():
event = asyncio.Event()
tasks = [asyncio.create_task(producer(event))] + [
asyncio.create_task(consumer(event)) for _ in range(3)
]
await asyncio.gather(*tasks)
while True:
asyncio.run(main())
print("\nSleeping for 1 sec...\n")
time.sleep(1)
这会产生以下输出:
Running producer...
Running producer...
Running consumer...
Running consumer...
Running consumer...
Sleeping for 1 sec...
Running producer...
Running producer...
Running consumer...
Running consumer...
Running consumer...
上面的代码片段将 运行 生产者和两个消费者无限期地。这按预期工作:
- 生产者和两个消费者运行并发
- 围绕
asyncio.run
的 while
循环是 运行 无限期地激活系统
但是,我想知道是否有更好的同步技术来实现这种long-运行ning周期?
使用事件唤醒消费者的另一种方法是使用队列同步生产者和消费者。代码看起来像
import asyncio
async def producer(queue: asyncio.Queue):
while True:
print("Running producer...")
message = await fetch_message_from_sqs()
if message:
await queue.put(message)
await asyncio.sleep(0.5)
async def consumer(queue: asyncio.Queue):
while True:
print("Running consumer...")
if queue.empty():
await asyncio.sleep(1)
continue
message = await queue.get()
print(message)
await acknowledge_message(message)
async def main():
# You can set a max size if you want to prevent pull too many messages from SQS.
queue = asyncio.Queue()
tasks = asyncio.gather(producer(queue), *[consumer(queue) for _ in range(3)])
if __name__ == "__main__":
asyncio.run(main())
我有一个 生产者 和一个 消费者 异步函数。首先是生产者 运行 ,然后是三个消费者同时消费任务。我想无限期地继续这个节奏。
- 停止 生产者 并向 消费者 发送信号以启动 运行ning 的最佳方法是什么?
- 停止 消费者 并向 生产者 发送信号以启动 运行ning 的最佳方法是什么?
我当前的设置如下所示:
import asyncio
import time
async def producer(event):
n = 0
while True:
print("Running producer...")
await asyncio.sleep(0.5)
n += 1
if n == 2:
event.set()
break
async def consumer(event):
await event.wait()
print("Running consumer...")
await asyncio.sleep(0.5)
async def main():
event = asyncio.Event()
tasks = [asyncio.create_task(producer(event))] + [
asyncio.create_task(consumer(event)) for _ in range(3)
]
await asyncio.gather(*tasks)
while True:
asyncio.run(main())
print("\nSleeping for 1 sec...\n")
time.sleep(1)
这会产生以下输出:
Running producer...
Running producer...
Running consumer...
Running consumer...
Running consumer...
Sleeping for 1 sec...
Running producer...
Running producer...
Running consumer...
Running consumer...
Running consumer...
上面的代码片段将 运行 生产者和两个消费者无限期地。这按预期工作:
- 生产者和两个消费者运行并发
- 围绕
asyncio.run
的while
循环是 运行 无限期地激活系统
但是,我想知道是否有更好的同步技术来实现这种long-运行ning周期?
使用事件唤醒消费者的另一种方法是使用队列同步生产者和消费者。代码看起来像
import asyncio
async def producer(queue: asyncio.Queue):
while True:
print("Running producer...")
message = await fetch_message_from_sqs()
if message:
await queue.put(message)
await asyncio.sleep(0.5)
async def consumer(queue: asyncio.Queue):
while True:
print("Running consumer...")
if queue.empty():
await asyncio.sleep(1)
continue
message = await queue.get()
print(message)
await acknowledge_message(message)
async def main():
# You can set a max size if you want to prevent pull too many messages from SQS.
queue = asyncio.Queue()
tasks = asyncio.gather(producer(queue), *[consumer(queue) for _ in range(3)])
if __name__ == "__main__":
asyncio.run(main())