'multiprocessing.resource_sharer' 中的属性错误 'DupFd' | Python 多处理 + 线程
AttributeError 'DupFd' in 'multiprocessing.resource_sharer' | Python multiprocessing + threading
我正在尝试在执行 I/O-bound 任务的多个 threading.Thread
(s) 和执行 CPU 绑定任务的多个 multiprocessing.Process
(es) 之间进行通信。每当一个线程为一个进程找到工作时,它会被放在一个 multiprocessing.Queue
上,连同一个 multiprocessing.Pipe(duplex=False)
的发送端。然后进程完成他们的工作并通过管道将结果发送回线程。这个过程似乎在大约 70% 的情况下有效,另外 30% 我收到 AttributeError: Can't get attribute 'DupFd' on <module 'multiprocessing.resource_sharer' from '/usr/lib/python3.5/multiprocessing/resource_sharer.py'>
重现:
import multiprocessing
import threading
import time
def thread_work(work_queue, pipe):
while True:
work_queue.put((threading.current_thread().name, pipe[1]))
received = pipe[0].recv()
print("{}: {}".format(threading.current_thread().name, threading.current_thread().name == received))
time.sleep(0.3)
def process_work(work_queue):
while True:
thread, pipe = work_queue.get()
pipe.send(thread)
work_queue = multiprocessing.Queue()
for i in range(0,3):
receive, send = multiprocessing.Pipe(duplex=False)
t = threading.Thread(target=thread_work, args=[work_queue, (receive, send)])
t.daemon = True
t.start()
for i in range(0,2):
p = multiprocessing.Process(target=process_work, args=[work_queue])
p.daemon = True
p.start()
time.sleep(5)
我查看了多处理 source code,但不明白为什么会出现此错误。
我尝试使用 queue.Queue
,或带有 duplex=True
(默认)的管道,但在错误中找不到模式。有人知道如何调试吗?
您在这里分叉了一个已经是多线程的主进程。众所周知,这通常是有问题的。
It is in-fact problem prone (and not just in Python). The rule is "thread after you fork, not before". Otherwise, the locks used by the thread executor will get duplicated across processes. If one of those processes dies while it has the lock, all of the other processes using that lock will deadlock -Raymond Hettinger.
您收到的错误的触发因素显然是子进程中管道的文件描述符复制失败。
要解决这个问题,要么创建你的子进程,只要你的主进程仍然是单线程的,要么使用另一个 start_method
来创建像 'spawn' 这样的新进程(默认为 Windows) 或 'forkserver',如果可用。
forkserver
When the program starts and selects the forkserver start method, a server process is started. From then on, whenever a new process is needed, the parent process connects to the server and requests that it fork a new process. The fork server process is single threaded so it is safe for it to use os.fork(). No unnecessary resources are inherited.
Available on Unix platforms which support passing file descriptors over Unix pipes. docs
您可以指定另一个 start_method:
multiprocessing.set_start_method(method)
Set the method which should be used to start child processes. method can be 'fork', 'spawn' or 'forkserver'.
Note that this should be called at most once, and it should be protected inside the if name == 'main' clause of the main module. docs
有关特定 start_methods(在 Ubuntu 18.04 上)的基准,请查看 here。
我正在尝试在执行 I/O-bound 任务的多个 threading.Thread
(s) 和执行 CPU 绑定任务的多个 multiprocessing.Process
(es) 之间进行通信。每当一个线程为一个进程找到工作时,它会被放在一个 multiprocessing.Queue
上,连同一个 multiprocessing.Pipe(duplex=False)
的发送端。然后进程完成他们的工作并通过管道将结果发送回线程。这个过程似乎在大约 70% 的情况下有效,另外 30% 我收到 AttributeError: Can't get attribute 'DupFd' on <module 'multiprocessing.resource_sharer' from '/usr/lib/python3.5/multiprocessing/resource_sharer.py'>
重现:
import multiprocessing
import threading
import time
def thread_work(work_queue, pipe):
while True:
work_queue.put((threading.current_thread().name, pipe[1]))
received = pipe[0].recv()
print("{}: {}".format(threading.current_thread().name, threading.current_thread().name == received))
time.sleep(0.3)
def process_work(work_queue):
while True:
thread, pipe = work_queue.get()
pipe.send(thread)
work_queue = multiprocessing.Queue()
for i in range(0,3):
receive, send = multiprocessing.Pipe(duplex=False)
t = threading.Thread(target=thread_work, args=[work_queue, (receive, send)])
t.daemon = True
t.start()
for i in range(0,2):
p = multiprocessing.Process(target=process_work, args=[work_queue])
p.daemon = True
p.start()
time.sleep(5)
我查看了多处理 source code,但不明白为什么会出现此错误。
我尝试使用 queue.Queue
,或带有 duplex=True
(默认)的管道,但在错误中找不到模式。有人知道如何调试吗?
您在这里分叉了一个已经是多线程的主进程。众所周知,这通常是有问题的。
It is in-fact problem prone (and not just in Python). The rule is "thread after you fork, not before". Otherwise, the locks used by the thread executor will get duplicated across processes. If one of those processes dies while it has the lock, all of the other processes using that lock will deadlock -Raymond Hettinger.
您收到的错误的触发因素显然是子进程中管道的文件描述符复制失败。
要解决这个问题,要么创建你的子进程,只要你的主进程仍然是单线程的,要么使用另一个 start_method
来创建像 'spawn' 这样的新进程(默认为 Windows) 或 'forkserver',如果可用。
forkserver
When the program starts and selects the forkserver start method, a server process is started. From then on, whenever a new process is needed, the parent process connects to the server and requests that it fork a new process. The fork server process is single threaded so it is safe for it to use os.fork(). No unnecessary resources are inherited.
Available on Unix platforms which support passing file descriptors over Unix pipes. docs
您可以指定另一个 start_method:
multiprocessing.set_start_method(method) Set the method which should be used to start child processes. method can be 'fork', 'spawn' or 'forkserver'.
Note that this should be called at most once, and it should be protected inside the if name == 'main' clause of the main module. docs
有关特定 start_methods(在 Ubuntu 18.04 上)的基准,请查看 here。