多处理同步
Multiprocessing Synchronization
假设我需要 运行 5 个并行进程,但进程 2 到 5 依赖于进程 1。我怎样才能确保进程 1 在其他进程之前 运行?我应该使用 Python 的 Multiprocessing Event() 还是 Lock() 或两者?
示例 1:
process 1
process 2 or 3 or 4 or 5
process 2 or 3 or 4 or 5
process 2 or 3 or 4 or 5
process 2 or 3 or 4 or 5
示例 2:
process 3
process 1 or 2 or 4 or 5
process 1 or 2 or 4 or 5
process 1 or 2 or 4 or 5
process 1 or 2 or 4 or 5
具有 2 个依赖项的示例 3:
process 1
process 2 or 3 (run in parallel after 1)
process 4
process 5 or 6 (run in parallel after 1 and after 4)
所有进程调用相同的函数 (msg),但所有 return 不同的值。
我需要一些指导,不一定是代码,如果你能提供,谢谢。
伪代码:
import Multiprocessing as mp
function(msg):
return 1 if msg == "one"
return 2 if msg == "two"
return 3 if msg == "three"
return 4 if msg == "four"
return 5 if msg == "five"
msgs = ['one', 'two', 'three', 'four', 'five']
jobs = []
for msg in msgs:
p = Process(target=function, args=(msg,))
p.start()
jobs.append(p)
for job in jobs:
job.join()
在这种情况下,所有进程都将 运行 无序。
如果我想要进程 1,我可以做:
可能的解决方案:
import Multiprocessing as mp
function(msg):
return 1 if msg == "one"
return 2 if msg == "two"
return 3 if msg == "three"
return 4 if msg == "four"
return 5 if msg == "five"
msg = ['one']
p1 = Process(target=function, args=(msg,))
p1.start()
p1.join()
msgs = ['two', 'three', 'four', 'five']
jobs = []
for msg in msgs:
p = Process(target=function, args=(msg,))
p.start()
jobs.append(p)
for job in jobs:
job.join()
有没有更好的解决办法或者这样就好了?它有效,但这并不意味着它不能以更好的方式完成(例如减少代码重复)。
不确定最后做了什么,但毕竟你可以使用 Event
s 来达到这个目的:
import multiprocessing as mp
def function(msg,events):
if msg == "one":
print(1)
events[0].set()
if msg == "two":
print("2 waiting")
events[0].wait()
events[1].wait()
print("2 done")
if msg == "three":
print(3)
events[1].set()
if msg == "four":
print(4)
if msg == "five":
print("5 waiting")
events[0].wait()
print("5 done")
if __name__ == '__main__':
events = [mp.Event(),mp.Event()]
jobs = []
for item in ['one','two','three','four','five']:
job = mp.Process(target=function, args=(item,events))
job.start()
jobs.append(job)
for job in jobs:
job.join()
这里特意引入了第二个依赖:p2同时依赖p1和p3(p5仍然依赖p1)。这样,如果你 运行 它几次,它会显示更多变化(比单一依赖):
python procy.py
2 waiting
4
1
5 waiting
5 done
3
2 done
python procy.py
1
5 waiting
2 waiting
4
5 done
3
2 done
python procy.py
1
4
3
5 waiting
5 done
2 waiting
2 done
假设我需要 运行 5 个并行进程,但进程 2 到 5 依赖于进程 1。我怎样才能确保进程 1 在其他进程之前 运行?我应该使用 Python 的 Multiprocessing Event() 还是 Lock() 或两者?
示例 1:
process 1
process 2 or 3 or 4 or 5
process 2 or 3 or 4 or 5
process 2 or 3 or 4 or 5
process 2 or 3 or 4 or 5
示例 2:
process 3
process 1 or 2 or 4 or 5
process 1 or 2 or 4 or 5
process 1 or 2 or 4 or 5
process 1 or 2 or 4 or 5
具有 2 个依赖项的示例 3:
process 1
process 2 or 3 (run in parallel after 1)
process 4
process 5 or 6 (run in parallel after 1 and after 4)
所有进程调用相同的函数 (msg),但所有 return 不同的值。
我需要一些指导,不一定是代码,如果你能提供,谢谢。
伪代码:
import Multiprocessing as mp
function(msg):
return 1 if msg == "one"
return 2 if msg == "two"
return 3 if msg == "three"
return 4 if msg == "four"
return 5 if msg == "five"
msgs = ['one', 'two', 'three', 'four', 'five']
jobs = []
for msg in msgs:
p = Process(target=function, args=(msg,))
p.start()
jobs.append(p)
for job in jobs:
job.join()
在这种情况下,所有进程都将 运行 无序。
如果我想要进程 1,我可以做:
可能的解决方案:
import Multiprocessing as mp
function(msg):
return 1 if msg == "one"
return 2 if msg == "two"
return 3 if msg == "three"
return 4 if msg == "four"
return 5 if msg == "five"
msg = ['one']
p1 = Process(target=function, args=(msg,))
p1.start()
p1.join()
msgs = ['two', 'three', 'four', 'five']
jobs = []
for msg in msgs:
p = Process(target=function, args=(msg,))
p.start()
jobs.append(p)
for job in jobs:
job.join()
有没有更好的解决办法或者这样就好了?它有效,但这并不意味着它不能以更好的方式完成(例如减少代码重复)。
不确定最后做了什么,但毕竟你可以使用 Event
s 来达到这个目的:
import multiprocessing as mp
def function(msg,events):
if msg == "one":
print(1)
events[0].set()
if msg == "two":
print("2 waiting")
events[0].wait()
events[1].wait()
print("2 done")
if msg == "three":
print(3)
events[1].set()
if msg == "four":
print(4)
if msg == "five":
print("5 waiting")
events[0].wait()
print("5 done")
if __name__ == '__main__':
events = [mp.Event(),mp.Event()]
jobs = []
for item in ['one','two','three','four','five']:
job = mp.Process(target=function, args=(item,events))
job.start()
jobs.append(job)
for job in jobs:
job.join()
这里特意引入了第二个依赖:p2同时依赖p1和p3(p5仍然依赖p1)。这样,如果你 运行 它几次,它会显示更多变化(比单一依赖):
python procy.py 2 waiting 4 1 5 waiting 5 done 3 2 done python procy.py 1 5 waiting 2 waiting 4 5 done 3 2 done python procy.py 1 4 3 5 waiting 5 done 2 waiting 2 done