multiprocessing.Process 将请求 response.content 放入队列后未终止
multiprocessing.Process doesn't terminate after putting requests response.content to queue
我正在尝试 运行 多个 API 请求与 multiprocessing.Process 和请求并行。我将要解析的 url 放入 JoinableQueue 实例并将内容放回 Queue 实例。我注意到将 response.content 放入队列会以某种方式阻止进程终止。
这是一个只有 1 个进程的简化示例 (Python 3.5):
import multiprocessing as mp
import queue
import requests
import time
class ChildProcess(mp.Process):
def __init__(self, q, qout):
super().__init__()
self.qin = qin
self.qout = qout
self.daemon = True
def run(self):
while True:
try:
url = self.qin.get(block=False)
r = requests.get(url, verify=False)
self.qout.put(r.content)
self.qin.task_done()
except queue.Empty:
break
except requests.exceptions.RequestException as e:
print(self.name, e)
self.qin.task_done()
print("Infinite loop terminates")
if __name__ == '__main__':
qin = mp.JoinableQueue()
qout = mp.Queue()
for _ in range(5):
qin.put('http://en.wikipedia.org')
w = ChildProcess(qin, qout)
w.start()
qin.join()
time.sleep(1)
print(w.name, w.is_alive())
运行我得到代码后:
Infinite loop terminates
ChildProcess-1 True
请帮助理解为什么 运行 函数退出后进程没有终止。
更新:添加了显示循环终止的打印语句
在打印消息上方添加对 w.terminate() 的调用。
关于为什么进程没有自行终止;你的函数代码是一个无限循环,所以它永远不会 return。调用终止表示进程自行终止。
根据 Queue
documentation 解决这个问题有点困难 - 我遇到了同样的问题。
这里的关键概念是,在生产者线程终止之前,它会加入它拥有 put
数据的任何队列;该连接然后阻塞,直到队列的后台线程终止,这仅在队列为空时发生。所以基本上,在您的 ChildProcess
退出之前,必须有人将它 put
中的所有内容消耗到队列中!
Queue.cancel_join_thread
函数有一些文档,应该可以解决这个问题,但我无法让它发挥任何作用——也许我没有正确使用它。
这是您可以进行的示例修改,应该可以解决问题:
if __name__ == '__main__':
qin = mp.JoinableQueue()
qout = mp.Queue()
for _ in range(5):
qin.put('http://en.wikipedia.org')
w = ChildProcess(qin, qout)
w.start()
qin.join()
while True:
try:
qout.get(True, 0.1) # Throw away remaining stuff in qout (or process it or whatever,
# just get it out of the queue so the queue background process
# can terminate, so your ChildProcess can terminate.
except queue.Empty:
break
w.join() # Wait for your ChildProcess to finish up.
# time.sleep(1) # Not necessary since we've joined the ChildProcess
print(w.name, w.is_alive())
如 Pipes and Queues documentation
中所述
if a child process has put items on a queue (and it has not used
JoinableQueue.cancel_join_thread), then that process will not
terminate until all buffered items have been flushed to the pipe.
This means that if you try joining that process you may get a deadlock
unless you are sure that all items which have been put on the queue
have been consumed.
...
Note that a queue created using a manager does not have this issue.
如果切换到管理器队列,则进程成功终止:
import multiprocessing as mp
import queue
import requests
import time
class ChildProcess(mp.Process):
def __init__(self, q, qout):
super().__init__()
self.qin = qin
self.qout = qout
self.daemon = True
def run(self):
while True:
try:
url = self.qin.get(block=False)
r = requests.get(url, verify=False)
self.qout.put(r.content)
self.qin.task_done()
except queue.Empty:
break
except requests.exceptions.RequestException as e:
print(self.name, e)
self.qin.task_done()
print("Infinite loop terminates")
if __name__ == '__main__':
manager = mp.Manager()
qin = mp.JoinableQueue()
qout = manager.Queue()
for _ in range(5):
qin.put('http://en.wikipedia.org')
w = ChildProcess(qin, qout)
w.start()
qin.join()
time.sleep(1)
print(w.name, w.is_alive())
我正在尝试 运行 多个 API 请求与 multiprocessing.Process 和请求并行。我将要解析的 url 放入 JoinableQueue 实例并将内容放回 Queue 实例。我注意到将 response.content 放入队列会以某种方式阻止进程终止。
这是一个只有 1 个进程的简化示例 (Python 3.5):
import multiprocessing as mp
import queue
import requests
import time
class ChildProcess(mp.Process):
def __init__(self, q, qout):
super().__init__()
self.qin = qin
self.qout = qout
self.daemon = True
def run(self):
while True:
try:
url = self.qin.get(block=False)
r = requests.get(url, verify=False)
self.qout.put(r.content)
self.qin.task_done()
except queue.Empty:
break
except requests.exceptions.RequestException as e:
print(self.name, e)
self.qin.task_done()
print("Infinite loop terminates")
if __name__ == '__main__':
qin = mp.JoinableQueue()
qout = mp.Queue()
for _ in range(5):
qin.put('http://en.wikipedia.org')
w = ChildProcess(qin, qout)
w.start()
qin.join()
time.sleep(1)
print(w.name, w.is_alive())
运行我得到代码后:
Infinite loop terminates
ChildProcess-1 True
请帮助理解为什么 运行 函数退出后进程没有终止。
更新:添加了显示循环终止的打印语句
在打印消息上方添加对 w.terminate() 的调用。
关于为什么进程没有自行终止;你的函数代码是一个无限循环,所以它永远不会 return。调用终止表示进程自行终止。
根据 Queue
documentation 解决这个问题有点困难 - 我遇到了同样的问题。
这里的关键概念是,在生产者线程终止之前,它会加入它拥有 put
数据的任何队列;该连接然后阻塞,直到队列的后台线程终止,这仅在队列为空时发生。所以基本上,在您的 ChildProcess
退出之前,必须有人将它 put
中的所有内容消耗到队列中!
Queue.cancel_join_thread
函数有一些文档,应该可以解决这个问题,但我无法让它发挥任何作用——也许我没有正确使用它。
这是您可以进行的示例修改,应该可以解决问题:
if __name__ == '__main__':
qin = mp.JoinableQueue()
qout = mp.Queue()
for _ in range(5):
qin.put('http://en.wikipedia.org')
w = ChildProcess(qin, qout)
w.start()
qin.join()
while True:
try:
qout.get(True, 0.1) # Throw away remaining stuff in qout (or process it or whatever,
# just get it out of the queue so the queue background process
# can terminate, so your ChildProcess can terminate.
except queue.Empty:
break
w.join() # Wait for your ChildProcess to finish up.
# time.sleep(1) # Not necessary since we've joined the ChildProcess
print(w.name, w.is_alive())
如 Pipes and Queues documentation
中所述if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe.
This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed.
...
Note that a queue created using a manager does not have this issue.
如果切换到管理器队列,则进程成功终止:
import multiprocessing as mp
import queue
import requests
import time
class ChildProcess(mp.Process):
def __init__(self, q, qout):
super().__init__()
self.qin = qin
self.qout = qout
self.daemon = True
def run(self):
while True:
try:
url = self.qin.get(block=False)
r = requests.get(url, verify=False)
self.qout.put(r.content)
self.qin.task_done()
except queue.Empty:
break
except requests.exceptions.RequestException as e:
print(self.name, e)
self.qin.task_done()
print("Infinite loop terminates")
if __name__ == '__main__':
manager = mp.Manager()
qin = mp.JoinableQueue()
qout = manager.Queue()
for _ in range(5):
qin.put('http://en.wikipedia.org')
w = ChildProcess(qin, qout)
w.start()
qin.join()
time.sleep(1)
print(w.name, w.is_alive())