如何根据对象的属性对多进程队列进行排序

How to sort a multiprocess queue based on an attribute of the object

对于常规列表,我可以根据对象属性对列表进行排序:

queue.sort(key=lambda weed: (weed.x_coord), reverse=True)

但是,对于多处理队列,这是不可能的,那么如何使用多处理队列完成相同的排序?或者如果我想让队列最终排序,最好避免使用多进程队列?

要求 queue/list 应该是线程安全和进程安全的,因为 queue/list 将由两个线程 运行 并行填充。

将对象插入共享队列的两个进程(p1 和 p2)将继续 运行 与从队列读取的第三个进程(状态机)一起(请参见下面的代码)。即状态机进程将不会等待p1和p2进程结束。

目前执行情况:

import multiprocessing

class Weed():
    x=None
    y=None
    def __init__(self,x,y):
        self.x=x
        self.y=y

def p1(q):
    """
    Function that inserts weed in the shared queue
    """
    # append squares of mylist to queue
    q.put(Weed(10.1,7.3))
    q.put(Weed(8.3,2.8))
    q.put(Weed(5.1,4.2))
    q.put(Weed(15.4,5.0))

def p2(q):
    """
    Function that inserts weed in the shared queue
    """
    # append squares of mylist to queue
    q.put(Weed(25.1,1))
    q.put(Weed(1.3,1))
    q.put(Weed(9.1,1))
    q.put(Weed(13.4,1))


def state_machine(q):
    """
    Function that sorts the queue (w.r.t x-coord.) and prints it out
    """
    print("Queue elements:")
    while not q.empty():
        q.sort(key=lambda x: (x.x), reverse=True) # Gives error - 
        print(q.get().x)
    print("Queue is now empty!")

if __name__ == "__main__":

    # creating multiprocessing Queue
    q = multiprocessing.Queue()

    # creating new processes
    p1 = multiprocessing.Process(target=p1, args=(q,))
    p2 = multiprocessing.Process(target=p2, args=(q,))
    p3 = multiprocessing.Process(target=state_machine, args=(q,))

    # running process p1 to generate some weeds
    p1.start()


    # running process p2 to generate some weeds
    p2.start()


    # running process p3 to sort the weed queue (by x coord.) and print them out
    p3.start()


    p1.join()
    p2.join()
    p3.join()

在您的示例中,这 3 个进程不会 运行 并发(您启动它们并在启动下一个之前加入它们),我假设真实世界的情况确实具有并发性。

但请注意:在实际情况下,空队列并不意味着其他任务已完成。您将需要另一种同步机制。

我的建议是回退到 state_machine 函数内的常规列表,并在元素到达时将元素从多处理队列传输到列表。然后您可以对列表进行排序并按顺序打印元素。您不会有并发问题,因为内部列表仅由线程 运行ning state_machine.

修改
def state_machine(q):
    """
    Function that sorts the queue (w.r.t x-coord.) and prints it out
    """
    print("Queue elements:")
    internal_queue = []
    # here, we assume that all tasks have finished working.
    # if it is not the case, you should add a barrier to wait
    while not q.empty():
        internal_queue.append(q.get())

    internal_queue.sort(key=lambda item: (item.x), reverse=True)
    for elt in internal_queue:
        print(elt.x)
    print("Queue is now empty!")

程序打印:

Queue elements:
25.1
15.4
13.4
10.1
9.1
8.3
5.1
1.3
Queue is now empty!

[编辑]

在真实场景中,您不希望在开始打印之前等待消费者完成。但是,您必须在两个问题之间找到折衷方案:

  • 如果您在开始消费元素之前等待的时间过长,您基本上会回到等待生产者完成的状态。
  • 如果你消耗队列元素的速度太快(即它们一到就打印),你的队列大部分时间会是空的,排序就没有多大意义了。

那里没有(恕我直言)最佳解决方案,这里有一个建议,定期更新内部队列,同时为生产者留出时间完成他们的工作:

def state_machine(q):
    """
    Function that sorts the queue (w.r.t x-coord.) and prints it out
    """
    internal_queue = []
    def update_internal_queue():
        while not q.empty():
            internal_queue.append(q.get())
        internal_queue.sort(key=lambda item: (item.x), reverse=True)


    # wait a bit for the beginning of the elements to arrive
    time.sleep(5)
    update_internal_queue()
    print("Queue elements:")

    while internal_queue:
        time.sleep(1) # we can optionally wait a bit before each print
        update_internal_queue() # see if other elements arrived in the meantime
        print(internal_queue.pop(0).x)
    print("Queue is now empty!")