多处理同步

Multiprocessing Synchronization

假设我需要 运行 5 个并行进程,但进程 2 到 5 依赖于进程 1。我怎样才能确保进程 1 在其他进程之前 运行?我应该使用 Python 的 Multiprocessing Event() 还是 Lock() 或两者?

示例 1:

process 1
process 2 or 3 or 4 or 5
process 2 or 3 or 4 or 5
process 2 or 3 or 4 or 5
process 2 or 3 or 4 or 5

示例 2:

process 3
process 1 or 2 or 4 or 5
process 1 or 2 or 4 or 5
process 1 or 2 or 4 or 5
process 1 or 2 or 4 or 5

具有 2 个依赖项的示例 3:

process 1
process 2 or 3 (run in parallel after 1)
process 4
process 5 or 6 (run in parallel after 1 and after 4)

所有进程调用相同的函数 (msg),但所有 return 不同的值。

我需要一些指导,不一定是代码,如果你能提供,谢谢。

伪代码:

import Multiprocessing as mp

function(msg):
    return 1 if msg == "one"
    return 2 if msg == "two"
    return 3 if msg == "three"
    return 4 if msg == "four"
    return 5 if msg == "five"

msgs = ['one', 'two', 'three', 'four', 'five']

jobs = []
for msg in msgs:
    p = Process(target=function, args=(msg,))
    p.start()
    jobs.append(p)

for job in jobs:
    job.join()

在这种情况下,所有进程都将 运行 无序。

如果我想要进程 1,我可以做:

可能的解决方案:

import Multiprocessing as mp

function(msg):
    return 1 if msg == "one"
    return 2 if msg == "two"
    return 3 if msg == "three"
    return 4 if msg == "four"
    return 5 if msg == "five"

msg = ['one']
p1 = Process(target=function, args=(msg,))
p1.start()
p1.join()


msgs = ['two', 'three', 'four', 'five']

jobs = []
for msg in msgs:
    p = Process(target=function, args=(msg,))
    p.start()
    jobs.append(p)

for job in jobs:
    job.join()

有没有更好的解决办法或者这样就好了?它有效,但这并不意味着它不能以更好的方式完成(例如减少代码重复)。

不确定最后做了什么,但毕竟你可以使用 Events 来达到这个目的:

import multiprocessing as mp

def function(msg,events):
  if msg == "one":
    print(1)
    events[0].set()
  if msg == "two":
    print("2 waiting")
    events[0].wait()
    events[1].wait()
    print("2 done")
  if msg == "three":
    print(3)
    events[1].set()
  if msg == "four":
    print(4)
  if msg == "five":
    print("5 waiting")
    events[0].wait()
    print("5 done")

if __name__ == '__main__':
  events = [mp.Event(),mp.Event()]
  jobs = []
  for item in ['one','two','three','four','five']:
    job = mp.Process(target=function, args=(item,events))
    job.start()
    jobs.append(job)
  for job in jobs:
    job.join()

这里特意引入了第二个依赖:p2同时依赖p1和p3(p5仍然依赖p1)。这样,如果你 运行 它几次,它会显示更多变化(比单一依赖):

python procy.py
2 waiting
4
1
5 waiting
5 done
3
2 done

python procy.py
1
5 waiting
2 waiting
4
5 done
3
2 done

python procy.py
1
4
3
5 waiting
5 done
2 waiting
2 done