multiprocessing.Queue 间歇性失败。 Python 中的错误?

multiprocessing.Queue fails intermittently. Bug in Python?

Python 的 multiprocessing.Queue 间歇性地失败,我不知道为什么。这是 Python 或我的脚本中的错误吗?

最少的失败脚本

import multiprocessing
import time
import logging
import multiprocessing.util
multiprocessing.util.log_to_stderr(level=logging.DEBUG)

queue = multiprocessing.Queue(maxsize=10)

def worker(queue):
    queue.put('abcdefghijklmnop')

    # "Indicate that no more data will be put on this queue by the
    # current process." --Documentation
    # time.sleep(0.01)
    queue.close()

proc = multiprocessing.Process(target=worker, args=(queue,))
proc.start()

# "Indicate that no more data will be put on this queue by the current
# process." --Documentation
# time.sleep(0.01)
queue.close()

proc.join()

我正在 Debian 的 CPython 3.6.6 中对此进行测试。它也因 docker python:3.7.0-alpine.

而失败
docker run --rm -v "${PWD}/test.py:/test.py" \
    python:3-alpine python3 /test.py

上面的脚本有时 会因 BrokenPipeError 而失败。

Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/queues.py", line 240, in _feed
    send_bytes(obj)
  File "/usr/lib/python3.6/multiprocessing/connection.py", line 200, in send_bytes
    self._send_bytes(m[offset:offset + size])
  File "/usr/lib/python3.6/multiprocessing/connection.py", line 404, in _send_bytes
    self._send(header + buf)
  File "/usr/lib/python3.6/multiprocessing/connection.py", line 368, in _send
    n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe

测试工具

因为这是断断续续的,所以我写了一个shell脚本来调用它多次并计算失败。

#!/bin/sh
total=10

successes=0
for i in `seq ${total}`
do
    if ! docker run --rm -v "${PWD}/test.py:/test.py" python:3-alpine \
         python3 test.py 2>&1 \
         | grep --silent BrokenPipeError
    then
        successes=$(expr ${successes} + 1)
    fi
done
python3 -c "print(${successes} / ${total})"

这通常显示一些分数,可能是 0.2,表示间歇性故障。

时间调整

如果我在 queue.close() 之前插入 time.sleep(0.01),它会始终如一地工作。我在 source code 中注意到写入发生在它自己的线程中。我认为如果写入线程仍在尝试写入数据并且所有其他线程都关闭了队列,则会导致错误。

调试日志

通过取消注释前几行,我可以跟踪失败和成功的执行。

失败:

[DEBUG/MainProcess] created semlock with handle 140480257941504
[DEBUG/MainProcess] created semlock with handle 140480257937408
[DEBUG/MainProcess] created semlock with handle 140480257933312
[DEBUG/MainProcess] Queue._after_fork()
[DEBUG/Process-1] Queue._after_fork()
[INFO/Process-1] child process calling self.run()
[DEBUG/Process-1] Queue._start_thread()
[DEBUG/Process-1] doing self._thread.start()
[DEBUG/Process-1] starting thread to feed data to pipe
[DEBUG/Process-1] ... done self._thread.start()
[DEBUG/Process-1] telling queue thread to quit
[INFO/Process-1] process shutting down
[DEBUG/Process-1] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-1] running the remaining "atexit" finalizers
[DEBUG/Process-1] joining queue thread
Traceback (most recent call last):
  File "/usr/lib/python3.7/multiprocessing/queues.py", line 242, in _feed
    send_bytes(obj)
  File "/usr/lib/python3.7/multiprocessing/connection.py", line 200, in send_bytes
    self._send_bytes(m[offset:offset + size])
  File "/usr/lib/python3.7/multiprocessing/connection.py", line 404, in _send_bytes
    self._send(header + buf)
  File "/usr/lib/python3.7/multiprocessing/connection.py", line 368, in _send
    n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
[DEBUG/Process-1] feeder thread got sentinel -- exiting
[DEBUG/Process-1] ... queue thread joined
[INFO/Process-1] process exiting with exitcode 0
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] running the remaining "atexit" finalizers

"Success"(真正无声的失败,只能用Python 3.6复制):

[DEBUG/MainProcess] created semlock with handle 139710276231168
[DEBUG/MainProcess] created semlock with handle 139710276227072
[DEBUG/MainProcess] created semlock with handle 139710276222976
[DEBUG/MainProcess] Queue._after_fork()
[DEBUG/Process-1] Queue._after_fork()
[INFO/Process-1] child process calling self.run()
[DEBUG/Process-1] Queue._start_thread()
[DEBUG/Process-1] doing self._thread.start()
[DEBUG/Process-1] starting thread to feed data to pipe
[DEBUG/Process-1] ... done self._thread.start()
[DEBUG/Process-1] telling queue thread to quit
[INFO/Process-1] process shutting down
[INFO/Process-1] error in queue thread: [Errno 32] Broken pipe
[DEBUG/Process-1] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-1] running the remaining "atexit" finalizers
[DEBUG/Process-1] joining queue thread
[DEBUG/Process-1] ... queue thread joined
[INFO/Process-1] process exiting with exitcode 0
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] running the remaining "atexit" finalizers

真正的成功(使用 time.sleep(0.01)):

[DEBUG/MainProcess] created semlock with handle 140283921616896
[DEBUG/MainProcess] created semlock with handle 140283921612800
[DEBUG/MainProcess] created semlock with handle 140283921608704
[DEBUG/MainProcess] Queue._after_fork()
[DEBUG/Process-1] Queue._after_fork()
[INFO/Process-1] child process calling self.run()
[DEBUG/Process-1] Queue._start_thread()
[DEBUG/Process-1] doing self._thread.start()
[DEBUG/Process-1] starting thread to feed data to pipe
[DEBUG/Process-1] ... done self._thread.start()
[DEBUG/Process-1] telling queue thread to quit
[INFO/Process-1] process shutting down
[DEBUG/Process-1] feeder thread got sentinel -- exiting
[DEBUG/Process-1] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-1] running the remaining "atexit" finalizers
[DEBUG/Process-1] joining queue thread
[DEBUG/Process-1] ... queue thread joined
[INFO/Process-1] process exiting with exitcode 0
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] running the remaining "atexit" finalizers

区别似乎在于,在真正成功的情况下,feeder 在 atexit handlers 之前接收到 sentinel 对象。

您的代码的主要问题是没有人使用您的工作进程放入队列中的内容。 python 队列期望队列中的数据在放置数据的进程被杀死之前被消耗(“刷新到管道”)。

从这个角度来看,你的例子没有多大意义,但如果你想让它工作:

关键是 queue.cancel_join_thread() -- https://docs.python.org/3/library/multiprocessing.html

Warning As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe. This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children.

Note that a queue created using a manager does not have this issue

^ 相关位。问题是东西正在从子进程放在队列中,但没有被任何人使用。在这种情况下,必须先在 CHILD 进程上调用 cancel_join_queue,然后再请求它 join。此代码示例将消除错误。

import multiprocessing
import time
import logging
import multiprocessing.util
multiprocessing.util.log_to_stderr(level=logging.DEBUG)

queue = multiprocessing.Queue(maxsize=10)

def worker(queue):
    queue.put('abcdefghijklmnop')

    # "Indicate that no more data will be put on this queue by the
    # current process." --Documentation
    # time.sleep(0.01)
    queue.close()
    
    queue.cancel_join_thread() # ideally, this would not be here but would rather be a response to a signal (or other IPC message) sent from the main process


proc = multiprocessing.Process(target=worker, args=(queue,))
proc.start()

# "Indicate that no more data will be put on this queue by the current
# process." --Documentation
# time.sleep(0.01)
queue.close()

proc.join()

我没有为此费心使用 IPC,因为根本没有消费者,但我希望这个想法很清楚。