如何在插槽可用后立即将新项目添加到异步队列?
How to add new item to asyncio queue as soon as a slot is available?
这段代码是这样工作的:
名为 'ids' 的列表包含 ID 号。通过 ID 号,我下载特定的消息。
'nDownload' 是列表索引。队列的大小值等于5.
我从列表中选取项目,一次下载一条消息并将其添加到队列中。
当 nDownload 等于 6 时:
- 发生QueueFull异常。
- 创建 5 个工人。
- 工作人员出于其他目的从消息中提取元数据。
- await queue.join() 阻塞,直到队列中的所有项目都被获取和处理。
- 结束 -> 删除工人。
代码有效,我到现在都没有问题。
nDownload = 0
workers = []
while (nDownload <= len(ids)):
try:
async for msg in get_messages(channel,ids=ids[nDownload]):
nDownload = nDownload + 1
try:
queue.put_nowait(msg)
except (asyncio.QueueFull,IndexError) as qErr:
nDownload = nDownload - 1
workers = [asyncio.create_task(worker(queue)) for _ in range(5)]
await queue.join()
for cancel in workers:
cancel.cancel()
except IndexError as iErr:
break
问题:
有时消息有不同的大小。例如:
消息 1 = 8 分钟内下载 100MB
消息 2 = 5 秒内下载 1MB
一旦它下载了最短的消息(消息 2),我得到一个免费的 'slot' 队列。
不幸的是,我必须等待消息 1,因为 queue.join()
那时如何添加新项目到队列?
为什么我要使用 queue.join() ?
因为我不知道如何将最多 5 条消息添加到队列中,请等待下载并恢复
我真的需要下载一组消息,而不是一次全部下载
谢谢
编辑:
是的,我的工人是这样定义的(简化)
async def worker(queue):
while True:
queue_msg = await queue.get()
loop = asyncio.get_event_loop()
try:
task = loop.create_task(extract(queue_msg))
await asyncio.wait_for(task, timeout=timeout)
except errors.Fail:
#Here I have to requeue the message when it fails,
#so it requeues the ID in order to download the same msg later
await queue.put(queue_msg.id)
except asyncio.TimeoutError:
#requeue the msg etcc...
finally:
queue.task_done()
你的回答很聪明,谢谢
但是我选择队列 'size > 1' 因为我需要在 'extract' 任务失败时重新获取消息。 (对不起我没告诉你)
我不知道如果队列大小 = 1 会发生什么并且我尝试添加项目。这个有点难
目前还不完全清楚你的约束是什么,但如果我理解正确的话:
- 您最多要同时下载 5 个东西
- 你不想浪费时间 - 一旦一个工人完成了一件物品,它就应该获得一个新的
队列大小应该与您的目的无关,它仅用作缓冲区,以防工作人员暂时快于 get_messages
。我什至会从 1 的队列大小开始,并试验更大的值是否有助于提高性能。
在 QueueFull
上生成任务似乎很奇怪而且没有必要。处理生产者-消费者模式的惯用方法是创建固定数量的消费者,并让他们在多个项目到达时对其进行处理。您没有显示 worker
,因此不清楚每个工作人员是只处理一条消息还是处理多条消息。
我会将循环重写为:
queue = asyncio.Queue(1)
workers = [asyncio.create_task(worker(queue)) for _ in range(5)]
for current in ids:
async for msg in get_messages(channel, id=current):
# enqueue msg, waiting (if needed) for a free slot in the queue
await queue.put(msg)
# wait for the remaining enqueued items to be processed
await queue.join()
# cancel the now-idle workers, which wait for a new message
# that will never arrive
for w in workers:
w.cancel()
工人可以这样定义:
async def worker(queue):
while True:
msg = await queue.get()
... process msg ...
queue.task_done()
这段代码是这样工作的:
名为 'ids' 的列表包含 ID 号。通过 ID 号,我下载特定的消息。 'nDownload' 是列表索引。队列的大小值等于5.
我从列表中选取项目,一次下载一条消息并将其添加到队列中。 当 nDownload 等于 6 时:
- 发生QueueFull异常。
- 创建 5 个工人。
- 工作人员出于其他目的从消息中提取元数据。
- await queue.join() 阻塞,直到队列中的所有项目都被获取和处理。
- 结束 -> 删除工人。
代码有效,我到现在都没有问题。
nDownload = 0
workers = []
while (nDownload <= len(ids)):
try:
async for msg in get_messages(channel,ids=ids[nDownload]):
nDownload = nDownload + 1
try:
queue.put_nowait(msg)
except (asyncio.QueueFull,IndexError) as qErr:
nDownload = nDownload - 1
workers = [asyncio.create_task(worker(queue)) for _ in range(5)]
await queue.join()
for cancel in workers:
cancel.cancel()
except IndexError as iErr:
break
问题: 有时消息有不同的大小。例如:
消息 1 = 8 分钟内下载 100MB
消息 2 = 5 秒内下载 1MB
一旦它下载了最短的消息(消息 2),我得到一个免费的 'slot' 队列。 不幸的是,我必须等待消息 1,因为 queue.join()
那时如何添加新项目到队列?
为什么我要使用 queue.join() ? 因为我不知道如何将最多 5 条消息添加到队列中,请等待下载并恢复 我真的需要下载一组消息,而不是一次全部下载 谢谢
编辑: 是的,我的工人是这样定义的(简化)
async def worker(queue):
while True:
queue_msg = await queue.get()
loop = asyncio.get_event_loop()
try:
task = loop.create_task(extract(queue_msg))
await asyncio.wait_for(task, timeout=timeout)
except errors.Fail:
#Here I have to requeue the message when it fails,
#so it requeues the ID in order to download the same msg later
await queue.put(queue_msg.id)
except asyncio.TimeoutError:
#requeue the msg etcc...
finally:
queue.task_done()
你的回答很聪明,谢谢 但是我选择队列 'size > 1' 因为我需要在 'extract' 任务失败时重新获取消息。 (对不起我没告诉你) 我不知道如果队列大小 = 1 会发生什么并且我尝试添加项目。这个有点难
目前还不完全清楚你的约束是什么,但如果我理解正确的话:
- 您最多要同时下载 5 个东西
- 你不想浪费时间 - 一旦一个工人完成了一件物品,它就应该获得一个新的
队列大小应该与您的目的无关,它仅用作缓冲区,以防工作人员暂时快于 get_messages
。我什至会从 1 的队列大小开始,并试验更大的值是否有助于提高性能。
在 QueueFull
上生成任务似乎很奇怪而且没有必要。处理生产者-消费者模式的惯用方法是创建固定数量的消费者,并让他们在多个项目到达时对其进行处理。您没有显示 worker
,因此不清楚每个工作人员是只处理一条消息还是处理多条消息。
我会将循环重写为:
queue = asyncio.Queue(1)
workers = [asyncio.create_task(worker(queue)) for _ in range(5)]
for current in ids:
async for msg in get_messages(channel, id=current):
# enqueue msg, waiting (if needed) for a free slot in the queue
await queue.put(msg)
# wait for the remaining enqueued items to be processed
await queue.join()
# cancel the now-idle workers, which wait for a new message
# that will never arrive
for w in workers:
w.cancel()
工人可以这样定义:
async def worker(queue):
while True:
msg = await queue.get()
... process msg ...
queue.task_done()