ThreadPoolExecutor 中的 worker 并不是真正的守护进程
The workers in ThreadPoolExecutor is not really daemon
我想不通的是,虽然 ThreadPoolExecutor
使用守护进程,但即使主线程退出,它们仍然 运行 。
我可以在 python3.6.4:
中提供一个最小的例子
import concurrent.futures
import time
def fn():
while True:
time.sleep(5)
print("Hello")
thread_pool = concurrent.futures.ThreadPoolExecutor()
thread_pool.submit(fn)
while True:
time.sleep(1)
print("Wow")
主线程和工作线程都是死循环。因此,如果我使用 KeyboardInterrupt
终止主线程,我预计整个程序也会终止。但实际上工作线程仍然是 运行ning 即使它是一个守护线程。
ThreadPoolExecutor
的源代码确认工作线程是守护线程:
t = threading.Thread(target=_worker,
args=(weakref.ref(self, weakref_cb),
self._work_queue))
t.daemon = True
t.start()
self._threads.add(t)
此外,如果我手动创建一个守护线程,它就像一个魅力:
from threading import Thread
import time
def fn():
while True:
time.sleep(5)
print("Hello")
thread = Thread(target=fn)
thread.daemon = True
thread.start()
while True:
time.sleep(1)
print("Wow")
所以我真的无法理解这种奇怪的行为。
突然...我找到了原因。根据ThreadPoolExecutor
的更多源代码:
# Workers are created as daemon threads. This is done to allow the interpreter
# to exit when there are still idle threads in a ThreadPoolExecutor's thread
# pool (i.e. shutdown() was not called). However, allowing workers to die with
# the interpreter has two undesirable properties:
# - The workers would still be running during interpreter shutdown,
# meaning that they would fail in unpredictable ways.
# - The workers could be killed while evaluating a work item, which could
# be bad if the callable being evaluated has external side-effects e.g.
# writing to a file.
#
# To work around this problem, an exit handler is installed which tells the
# workers to exit when their work queues are empty and then waits until the
# threads finish.
_threads_queues = weakref.WeakKeyDictionary()
_shutdown = False
def _python_exit():
global _shutdown
_shutdown = True
items = list(_threads_queues.items())
for t, q in items:
q.put(None)
for t, q in items:
t.join()
atexit.register(_python_exit)
有一个退出处理程序,它将加入所有未完成的工作人员...
这是避免此问题的方法。糟糕的设计可以被另一个糟糕的设计打败。人们只有在真正知道工作人员不会损坏任何对象或文件时才会写 daemon=True
。
在我的例子中,我用一个工人创建了 TreadPoolExecutor
,在一个 submit
之后,我刚刚从队列中删除了新创建的线程,这样解释器就不会等到这个线程停止在其自己的。请注意,工作线程是在 submit
之后创建的,而不是在 TreadPoolExecutor
.
的初始化之后创建的
import concurrent.futures.thread
from concurrent.futures import ThreadPoolExecutor
...
executor = ThreadPoolExecutor(max_workers=1)
future = executor.submit(lambda: self._exec_file(args))
del concurrent.futures.thread._threads_queues[list(executor._threads)[0]]
它适用于 Python 3.8,但可能不适用于 3.9+,因为此代码正在访问私有变量。
查看工作代码on github
我想不通的是,虽然 ThreadPoolExecutor
使用守护进程,但即使主线程退出,它们仍然 运行 。
我可以在 python3.6.4:
中提供一个最小的例子import concurrent.futures
import time
def fn():
while True:
time.sleep(5)
print("Hello")
thread_pool = concurrent.futures.ThreadPoolExecutor()
thread_pool.submit(fn)
while True:
time.sleep(1)
print("Wow")
主线程和工作线程都是死循环。因此,如果我使用 KeyboardInterrupt
终止主线程,我预计整个程序也会终止。但实际上工作线程仍然是 运行ning 即使它是一个守护线程。
ThreadPoolExecutor
的源代码确认工作线程是守护线程:
t = threading.Thread(target=_worker,
args=(weakref.ref(self, weakref_cb),
self._work_queue))
t.daemon = True
t.start()
self._threads.add(t)
此外,如果我手动创建一个守护线程,它就像一个魅力:
from threading import Thread
import time
def fn():
while True:
time.sleep(5)
print("Hello")
thread = Thread(target=fn)
thread.daemon = True
thread.start()
while True:
time.sleep(1)
print("Wow")
所以我真的无法理解这种奇怪的行为。
突然...我找到了原因。根据ThreadPoolExecutor
的更多源代码:
# Workers are created as daemon threads. This is done to allow the interpreter
# to exit when there are still idle threads in a ThreadPoolExecutor's thread
# pool (i.e. shutdown() was not called). However, allowing workers to die with
# the interpreter has two undesirable properties:
# - The workers would still be running during interpreter shutdown,
# meaning that they would fail in unpredictable ways.
# - The workers could be killed while evaluating a work item, which could
# be bad if the callable being evaluated has external side-effects e.g.
# writing to a file.
#
# To work around this problem, an exit handler is installed which tells the
# workers to exit when their work queues are empty and then waits until the
# threads finish.
_threads_queues = weakref.WeakKeyDictionary()
_shutdown = False
def _python_exit():
global _shutdown
_shutdown = True
items = list(_threads_queues.items())
for t, q in items:
q.put(None)
for t, q in items:
t.join()
atexit.register(_python_exit)
有一个退出处理程序,它将加入所有未完成的工作人员...
这是避免此问题的方法。糟糕的设计可以被另一个糟糕的设计打败。人们只有在真正知道工作人员不会损坏任何对象或文件时才会写 daemon=True
。
在我的例子中,我用一个工人创建了 TreadPoolExecutor
,在一个 submit
之后,我刚刚从队列中删除了新创建的线程,这样解释器就不会等到这个线程停止在其自己的。请注意,工作线程是在 submit
之后创建的,而不是在 TreadPoolExecutor
.
import concurrent.futures.thread
from concurrent.futures import ThreadPoolExecutor
...
executor = ThreadPoolExecutor(max_workers=1)
future = executor.submit(lambda: self._exec_file(args))
del concurrent.futures.thread._threads_queues[list(executor._threads)[0]]
它适用于 Python 3.8,但可能不适用于 3.9+,因为此代码正在访问私有变量。
查看工作代码on github