如何在插槽可用后立即将新项目添加到异步队列?

How to add new item to asyncio queue as soon as a slot is available?

这段代码是这样工作的:

名为 'ids' 的列表包含 ID 号。通过 ID 号,我下载特定的消息。 'nDownload' 是列表索引。队列的大小值等于5.

我从列表中选取项目,一次下载一条消息并将其添加到队列中。 当 nDownload 等于 6 时:

  1. 发生QueueFull异常。
  2. 创建 5 个工人。
  3. 工作人员出于其他目的从消息中提取元数据。
  4. await queue.join() 阻塞,直到队列中的所有项目都被获取和处理。
  5. 结束 -> 删除工人。

代码有效,我到现在都没有问题。

                nDownload = 0
                workers = []
                while (nDownload <= len(ids)):                        
                    try:
                        async for msg in get_messages(channel,ids=ids[nDownload]):
                           nDownload = nDownload + 1
                            try:   
                                queue.put_nowait(msg)
                            except (asyncio.QueueFull,IndexError) as qErr:
                                nDownload = nDownload - 1 
                                workers = [asyncio.create_task(worker(queue)) for _ in range(5)] 
                                await queue.join() 
                                for cancel in workers:
                                    cancel.cancel()                                    
                    except IndexError as iErr:
                        break    

问题: 有时消息有不同的大小。例如:

消息 1 = 8 分钟内下载 100MB

消息 2 = 5 秒内下载 1MB

一旦它下载了最短的消息(消息 2),我得到一个免费的 'slot' 队列。 不幸的是,我必须等待消息 1,因为 queue.join()

那时如何添加新项目到队列?

为什么我要使用 queue.join() ? 因为我不知道如何将最多 5 条消息添加到队列中,请等待下载并恢复 我真的需要下载一组消息,而不是一次全部下载 谢谢

编辑: 是的,我的工人是这样定义的(简化)

async def worker(queue):
while True:
    queue_msg = await queue.get()
    loop = asyncio.get_event_loop()
    try:
        task = loop.create_task(extract(queue_msg))
        await asyncio.wait_for(task, timeout=timeout)

    except errors.Fail:
  #Here I have to requeue the message when it fails,
  #so it requeues the ID in order to download the same msg later
     await queue.put(queue_msg.id)
    except asyncio.TimeoutError: 
     #requeue the msg etcc...

    finally:    
        queue.task_done()

你的回答很聪明,谢谢 但是我选择队列 'size > 1' 因为我需要在 'extract' 任务失败时重新获取消息。 (对不起我没告诉你) 我不知道如果队列大小 = 1 会发生什么并且我尝试添加项目。这个有点难

目前还不完全清楚你的约束是什么,但如果我理解正确的话:

  • 您最多要同时下载 5 个东西
  • 你不想浪费时间 - 一旦一个工人完成了一件物品,它就应该获得一个新的

队列大小应该与您的目的无关,它仅用作缓冲区,以防工作人员暂时快于 get_messages。我什至会从 1 的队列大小开始,并试验更大的值是否有助于提高性能。

QueueFull 上生成任务似乎很奇怪而且没有必要。处理生产者-消费者模式的惯用方法是创建固定数量的消费者,并让他们在多个项目到达时对其进行处理。您没有显示 worker,因此不清楚每个工作人员是只处理一条消息还是处理多条消息。

我会将循环重写为:

queue = asyncio.Queue(1)
workers = [asyncio.create_task(worker(queue)) for _ in range(5)]
for current in ids:
    async for msg in get_messages(channel, id=current):
        # enqueue msg, waiting (if needed) for a free slot in the queue
        await queue.put(msg)
# wait for the remaining enqueued items to be processed
await queue.join()
# cancel the now-idle workers, which wait for a new message
# that will never arrive
for w in workers:
    w.cancel()

工人可以这样定义:

async def worker(queue):
    while True:
        msg = await queue.get()
        ... process msg ...
        queue.task_done()