如何区分Multiprocessing.Pool中的进程?
How to distinguish processes in Multiprocessing.Pool?
我正在使用 python multiprocessing
将一些子进程派生到 运行 我的工作。有两个需求:
- 我需要知道子进程的 pid,以防我想杀掉它。
- 工作完成后我需要回调来做一些事情。因为那些东西在父进程中使用了锁,所以它不能在子进程中完成。
但我得到:
- 生成的进程
by multiprocessing.Process()
具有属性 "pid" 以获取其 pid。但是我不能添加我的异步回调,当然我也不能同步等待。
multiprocessing.Pool()
生成的进程池提供回调接口。但是我不知道池中的哪个进程是匹配我的工作的,因为我可能需要根据特定的工作来杀死进程。
任务很便宜,这里显示代码:
import random, time
import multiprocessing
import os
class Job(object):
def __init__(self, jobid, jobname, command):
self.jobid, self.jobname, self.command = jobid, jobname, command
def __str__(self):
return "Job <{0:05d}>".format(self.jobid)
def __repr__(self):
return self.__str__()
def _run_job(job):
time.sleep(1)
print "{} done".format(job)
return job, random.choice([True, False]) # the second argument indicates whether job has finished successfully
class Test(object):
def __init__(self):
self._loc = multiprocessing.Lock()
self._process_pool = multiprocessing.Pool()
def submit_job(self, job):
with self._loc:
self._process_pool.apply_async(_run_job, (job,), callback=self.job_done)
print "submitting {} successfully".format(job)
def job_done(self, result):
with self._loc:
# stuffs after job has finished is related to some cleanning work, so it needs the lock of the parent process
job, success = result
if success:
print "{} success".format(job)
else:
print "{} failure".format(job)
j1 = Job(1, "test1", "command1")
j2 = Job(2, "test2", "command2")
t = Test()
t.submit_job(j1)
t.submit_job(j2)
time.sleep(3.1) # wait for all jobs finishing
但是现在获取不到每个job对应的pid。比如我要kill作业<1>,但是在进程池中找不到哪个进程是和job<1>相关的,所以我想kill作业可能就不会了。
如果我换用multiprocessing.Process
,我可以记录每个进程的pid和对应的jobid。但是我现在无法添加回调方法。
那么有没有办法既获取子进程的pid又添加回调方法?
最后我找到了解决办法:改用multiprocessing.Event
。
由于multiprocessing.Pool
无法告诉我分配了哪个进程,所以我无法记录它以便我可以根据作业id随时杀死它。
幸运的是,multiprocessing
提供了 Event
对象作为回调方法的替代方法。回想一下回调方法的作用:它向子进程提供异步响应。一旦子进程完成,父进程可以检测到它并调用回调方法。所以核心问题是父进程如何检测子进程是否完成。那是 Event
的对象。
所以解决方案很简单:将一个 Event
对象传递给子进程。子进程完成后,它会设置 Event
对象。在父进程中,它启动一个守护线程来监视是否设置了事件。如果是这样,它可以调用执行这些回调操作的方法。此外,由于我使用 multiprocessing.Process
而不是 multiprocessing.Pool
创建进程,我可以轻松获取它的 PID,这使我能够杀死它。
解决代码:
import time
import multiprocessing
import threading
class Job(object):
def __init__(self, jobid, jobname, command):
self.jobid, self.jobname, self.command = jobid, jobname, command
self.lifetime = 0
def __str__(self):
return "Job <{0:05d}>".format(self.jobid)
def __repr__(self):
return self.__str__()
def _run_job(job, done_event):
time.sleep(1)
print "{} done".format(job)
done_event.set()
class Test(object):
def __init__(self):
self._loc = multiprocessing.Lock()
self._process_pool = {}
t = threading.Thread(target=self.scan_jobs)
t.daemon = True
t.start()
def scan_jobs(self):
while True:
with self._loc:
done_jobid = []
for jobid in self._process_pool:
process, event = self._process_pool[jobid]
if event.is_set():
print "Job<{}> is done in process <{}>".format(jobid, process.pid)
done_jobid.append(jobid)
map(self._process_pool.pop, done_jobid)
time.sleep(1)
def submit_job(self, job):
with self._loc:
done_event = multiprocessing.Event()
new_process = multiprocessing.Process(target=_run_host_job, args=(job, done_event))
new_process.daemon = True
self._process_pool[job.jobid] = (new_process, done_event)
new_process.start()
print "submitting {} successfully".format(job)
j1 = Job(1, "test1", "command1")
j2 = Job(2, "test2", "command2")
t = Test()
t.submit_job(j1)
t.submit_job(j2)
time.sleep(5) # wait for job to finish
我正在使用 python multiprocessing
将一些子进程派生到 运行 我的工作。有两个需求:
- 我需要知道子进程的 pid,以防我想杀掉它。
- 工作完成后我需要回调来做一些事情。因为那些东西在父进程中使用了锁,所以它不能在子进程中完成。
但我得到:
- 生成的进程
by multiprocessing.Process()
具有属性 "pid" 以获取其 pid。但是我不能添加我的异步回调,当然我也不能同步等待。 multiprocessing.Pool()
生成的进程池提供回调接口。但是我不知道池中的哪个进程是匹配我的工作的,因为我可能需要根据特定的工作来杀死进程。
任务很便宜,这里显示代码:
import random, time
import multiprocessing
import os
class Job(object):
def __init__(self, jobid, jobname, command):
self.jobid, self.jobname, self.command = jobid, jobname, command
def __str__(self):
return "Job <{0:05d}>".format(self.jobid)
def __repr__(self):
return self.__str__()
def _run_job(job):
time.sleep(1)
print "{} done".format(job)
return job, random.choice([True, False]) # the second argument indicates whether job has finished successfully
class Test(object):
def __init__(self):
self._loc = multiprocessing.Lock()
self._process_pool = multiprocessing.Pool()
def submit_job(self, job):
with self._loc:
self._process_pool.apply_async(_run_job, (job,), callback=self.job_done)
print "submitting {} successfully".format(job)
def job_done(self, result):
with self._loc:
# stuffs after job has finished is related to some cleanning work, so it needs the lock of the parent process
job, success = result
if success:
print "{} success".format(job)
else:
print "{} failure".format(job)
j1 = Job(1, "test1", "command1")
j2 = Job(2, "test2", "command2")
t = Test()
t.submit_job(j1)
t.submit_job(j2)
time.sleep(3.1) # wait for all jobs finishing
但是现在获取不到每个job对应的pid。比如我要kill作业<1>,但是在进程池中找不到哪个进程是和job<1>相关的,所以我想kill作业可能就不会了。
如果我换用multiprocessing.Process
,我可以记录每个进程的pid和对应的jobid。但是我现在无法添加回调方法。
那么有没有办法既获取子进程的pid又添加回调方法?
最后我找到了解决办法:改用multiprocessing.Event
。
由于multiprocessing.Pool
无法告诉我分配了哪个进程,所以我无法记录它以便我可以根据作业id随时杀死它。
幸运的是,multiprocessing
提供了 Event
对象作为回调方法的替代方法。回想一下回调方法的作用:它向子进程提供异步响应。一旦子进程完成,父进程可以检测到它并调用回调方法。所以核心问题是父进程如何检测子进程是否完成。那是 Event
的对象。
所以解决方案很简单:将一个 Event
对象传递给子进程。子进程完成后,它会设置 Event
对象。在父进程中,它启动一个守护线程来监视是否设置了事件。如果是这样,它可以调用执行这些回调操作的方法。此外,由于我使用 multiprocessing.Process
而不是 multiprocessing.Pool
创建进程,我可以轻松获取它的 PID,这使我能够杀死它。
解决代码:
import time
import multiprocessing
import threading
class Job(object):
def __init__(self, jobid, jobname, command):
self.jobid, self.jobname, self.command = jobid, jobname, command
self.lifetime = 0
def __str__(self):
return "Job <{0:05d}>".format(self.jobid)
def __repr__(self):
return self.__str__()
def _run_job(job, done_event):
time.sleep(1)
print "{} done".format(job)
done_event.set()
class Test(object):
def __init__(self):
self._loc = multiprocessing.Lock()
self._process_pool = {}
t = threading.Thread(target=self.scan_jobs)
t.daemon = True
t.start()
def scan_jobs(self):
while True:
with self._loc:
done_jobid = []
for jobid in self._process_pool:
process, event = self._process_pool[jobid]
if event.is_set():
print "Job<{}> is done in process <{}>".format(jobid, process.pid)
done_jobid.append(jobid)
map(self._process_pool.pop, done_jobid)
time.sleep(1)
def submit_job(self, job):
with self._loc:
done_event = multiprocessing.Event()
new_process = multiprocessing.Process(target=_run_host_job, args=(job, done_event))
new_process.daemon = True
self._process_pool[job.jobid] = (new_process, done_event)
new_process.start()
print "submitting {} successfully".format(job)
j1 = Job(1, "test1", "command1")
j2 = Job(2, "test2", "command2")
t = Test()
t.submit_job(j1)
t.submit_job(j2)
time.sleep(5) # wait for job to finish