Python: multiprocessing.Queue() 中可能丢失数据

Python: possible data loss in multiprocessing.Queue()

假设我有以下示例,我在其中创建了一个守护进程并尝试通过事件标志与其通信:

from multiprocessing import Process, Event, Queue
import time

def reader(data):
    input_queue = data[0]
    e = data[1]
    output_queue = data[2]

    while True:
        if not e.is_set(): # if there is a signal to start
            msg = input_queue.get()         # Read from the queue 
            output_queue.put(msg)     # copy to output_queue
            if (msg == 'DONE'):  # signal to stop              
                e.set() # signal that worker is done


def writer(count, queue):
    ## Write to the queue
    for ii in range(0, count):
        queue.put(ii)             # Write 'count' numbers into the queue
    queue.put('DONE')

if __name__=='__main__':
    input_queue = Queue()   # reader() reads from queue
                          # writer() writes to queue

    output_queue = Queue()


    e = Event()
    e.set()

    reader_p = Process(target=reader, args=((input_queue, e, output_queue),))
    reader_p.daemon = True
    reader_p.start()        # Launch reader() as a separate python process

    for count in [10**4, 10**5, 10**6]:



        _start = time.time()
        writer(count, input_queue)    # Send a lot of stuff to reader()

        e.clear() # unset event, giving signal to a worker


        e.wait() # waiting for reader to finish


        # fetch results from output_queue:
        results = []
        while not output_queue.empty():
            results += [output_queue.get()]

        print(len(results)) # check how many results we have

        print("Sending %s numbers to Queue() took %s seconds" % (count, 
            (time.time() - _start)))

我使用输入和输出队列,在此示例中,worker 只是将数据复制到输出,稍后我将在程序中获取这些数据。在数据长度为 10k 之前一切似乎都正常(这实际上是队列大小限制,以字节为单位吗?),但是当我尝试复制更多元素时,我收到了随机数量的结果,但比发送的结果少得多:

10001
Sending 10000 numbers to Queue() took 0.4259309768676758 seconds
18857
Sending 100000 numbers to Queue() took 1.693629503250122 seconds
12439
Sending 1000000 numbers to Queue() took 10.592029809951782 seconds

10001
Sending 10000 numbers to Queue() took 0.41446948051452637 seconds
46615
Sending 100000 numbers to Queue() took 1.9259979724884033 seconds
18623
Sending 1000000 numbers to Queue() took 10.06524133682251 seconds

更新:现在我尝试在三个工人之间共享数据。我检查过它们都在工作,但数据丢失并没有停止:

import multiprocessing
from multiprocessing import Process, Event, Queue
import time

def reader(data):
    input_queue = data[0]
    e = data[1]
    output_queue = data[2]


    while True:
        if not e.is_set(): # if there is a signal to start

                #if not output_queue.empty(): # hangs somewhy
                msg = input_queue.get()         # Read from the queue 
                output_queue.put(msg)     # copy to output_queue
                #print("1")
                if (msg == 'DONE'):  # signal to stop              
                    e.set() # signal that there is no more data
                    print("done")



def reader1(data):
    input_queue = data[0]
    e = data[1]
    output_queue = data[2]


    while True:
        if not e.is_set(): # if there is a signal to start
                msg = input_queue.get()         # Read from the queue 
                output_queue.put(msg)     # copy to output_queue
                #print("2")
                if (msg == 'DONE'):  # signal to stop              
                    e.set() # signal that there is no more data
                    print("done")


def reader2(data):
    input_queue = data[0]
    e = data[1]
    output_queue = data[2]

    while True:
        if not e.is_set(): # if there is a signal to start
                msg = input_queue.get()         # Read from the queue 
                output_queue.put(msg)     # copy to output_queue
                #print("3")
                if (msg == 'DONE'):  # signal to stop              
                    e.set() # signal that there is no more data
                    print("done")






def writer(count, queue):
    ## Write to the queue
    for ii in range(0, count):
        queue.put(ii)             # Write 'count' numbers into the queue
    queue.put('DONE')

if __name__=='__main__':

    # I do not use manager, as it makes everything extremely slow
    #m = multiprocessing.Manager()
    #input_queue = m.Queue()

    input_queue = Queue()   # reader() reads from queue
                          # writer() writes to queue

    output_queue = Queue()


    e = Event()
    e.set()

    reader_p = Process(target=reader, args=((input_queue, e, output_queue),))
    reader_p.daemon = True
    reader_p.start()        # Launch reader() as a separate python process

    reader_p1 = Process(target=reader1, args=((input_queue, e, output_queue),))
    reader_p1.daemon = True
    reader_p1.start() 

    reader_p2 = Process(target=reader2, args=((input_queue, e, output_queue),))
    reader_p2.daemon = True
    reader_p2.start() 

    for count in [10**4, 10**5, 10**6]:


        _start = time.time()
        writer(count, input_queue)    # Send a lot of stuff to readers

        e.clear() # unset event, giving signal to a worker


        e.wait() # waiting for reader to finish


        # fetch results from output_queue:
        results = []
        while not output_queue.empty():
            results += [output_queue.get()]

        print(len(results)) # check how many results we have

        print("Sending %s numbers to Queue() took %s seconds" % (count, 
            (time.time() - _start)))

结果,有时我正确完成了第二阶段:

done
10001
Sending 10000 numbers to Queue() took 0.37468671798706055 seconds
done
18354
Sending 100000 numbers to Queue() took 1.2723915576934814 seconds
done
34807
Sending 1000000 numbers to Queue() took 9.1871018409729 seconds

done
10001
Sending 10000 numbers to Queue() took 0.37137532234191895 seconds
done
100001
Sending 100000 numbers to Queue() took 2.5747978687286377 seconds
done
217034
Sending 1000000 numbers to Queue() took 12.640174627304077 seconds

队列大小确实有限制:在多处理中这个限制是不可靠的,一旦达到,queue.put 就会被阻塞,直到队列被清空。有关详细信息,请参阅文档:https://docs.python.org/2/library/multiprocessing.html#multiprocessing.Queue

在你的情况下,这不是问题所在。您刚刚定义了一个错误的条件来停止获取结果:

while not output_queue.empty():
     results += [output_queue.get()]

在您的情况下,如果编写器比 reader 慢(有时他们会慢),即使编写器尚未完成发送,您的队列也可能会暂时为空一切。这就是为什么你的阅读次数不稳定的原因。

为了确认,我将此条件替换为:

t0 = time.time()
while time.time()-t0<30: # seems to be enough to complete your loops, but it's just a demo condition, you should not use this
    try:
        results += [output_queue.get(timeout=1)]
    except Exception as expt: # the output_queue.get(timeout=1) will wait up to 1 second if the queue is momentarily empty. If the queue is empty for more than 1 sec, it raises an exception and it means the loop is complete. Again, this is not a good condition in real life, and this is just for testing.
        break