使用 python 执行并行进程时的 Return 值
Return value while executing paralel processes with python
我一直在使用 python 的多处理,并且成功地使用了队列,但是在进程仍在执行时,我需要(从 main)监视一些变量。
我知道使用全局变量不是一个好的做法,但即使是这种方法也没有奏效。
谁能给我指出正确的方向?
提前致谢,
GC克鲁兹
附录:
我发布了一个我想做的简单示例:
import multiprocessing
import time
def sampleprocess(array, count):
'''process with heavy image processing in a loop'''
for i in range(count)
# Do processing on this array that outputs a given variable
sample_variable= count*10 # I would like to monitor this variable
if __name__ == '__main__':
p = multiprocessing.Process(target=sampleprocess, args=(array,1000,))
p.start()
# continuously monitor the dummy variable that is being computed on the process
while sample_variable < 1000
time.sleep(0.1)
print ' Still less than 1000'
一种选择是对共享数据对象使用多处理值和数组。
https://docs.python.org/2/library/multiprocessing.html#multiprocessing-managers
您的示例代码中的一个工作示例。如果你有不止一个
进程需要一个锁来同步写入。
from multiprocessing import Process, Value, Array, Lock
导入时间
def sampleprocess(s, count, lock):
'''process with heavy image processing in a loop'''
对于范围内的我(count.value):
# 对输出给定变量的数组进行处理
sample_variable= count*10 # 我要监控这个变量
with lock:
s.value= i*10
if name == 'main':
val = Value('i', 1000)
sample_variable = Value('i', 0)
lock = Lock()
p = Process(target=sampleprocess, args=(sample_variable, val, lock))
p.start()
# continuously monitor the dummy variable that is being computed on the process
while sample_variable.value < 1000:
time.sleep(0.1)
print ' Still less than 1000'
这里有一些程序可以演示如何实施解决方案:
示例 1
from multiprocessing import *
import time
def main():
array, loops = list(range(1000)), 1000
variable = Value('I')
p = Process(target=sample, args=(array, loops, variable))
p.start()
while variable.value < 1000:
print('Still less than 1000')
time.sleep(0.005)
print('Must be at least 1000')
p.join()
print('Value is', variable.value)
def sample(array, loops, variable):
for number in range(loops):
variable.value = number * 10
print('Sample is done')
if __name__ == '__main__':
main()
示例 2
from multiprocessing import *
import time
def main():
processes = 10
array, loops = list(range(1000)), 1000
shared = Array('I', processes)
p_array = []
for index in range(processes):
p = Process(target=sample, args=(array, loops, shared, index))
p.start()
p_array.append(p)
while True:
less_than_1000 = [p for p in enumerate(shared[:]) if p[1] < 1000]
if less_than_1000:
print(less_than_1000)
time.sleep(0.001)
else:
break
print('No process in less than 1000')
for p in p_array:
p.join()
print(shared[:])
def sample(array, loops, p_array, index):
time.sleep(1)
for number in range(loops):
time.sleep(0.001)
p_array[index] = number * 10
print('Sample is done')
if __name__ == '__main__':
main()
示例 3
from multiprocessing import *
import time
def main():
array, loops = list(range(1000)), 1000
with Manager() as manager:
variable = manager.Value('I', 0)
p = Process(target=sample, args=(array, loops, variable))
p.start()
while variable.value < 1000:
print('Still less than 1000')
time.sleep(0.005)
print('Must be at least 1000')
p.join()
print('Value is', variable.value)
def sample(array, loops, variable):
for number in range(loops):
variable.value = number * 10
print('Sample is done')
if __name__ == '__main__':
main()
示例 4
from multiprocessing import *
import time
def main():
array, loops = list(range(1000)), 1000
event = Event()
p = Process(target=sample, args=(array, loops, event))
p.start()
event.wait()
print('Must be at least 1000')
p.join()
def sample(array, loops, event):
for number in range(loops):
if number >= 100 and not event.is_set():
event.set()
time.sleep(0.001)
print('Sample is done')
if __name__ == '__main__':
main()
如您所见,有多种方法可以完成您要完成的任务。
我一直在使用 python 的多处理,并且成功地使用了队列,但是在进程仍在执行时,我需要(从 main)监视一些变量。
我知道使用全局变量不是一个好的做法,但即使是这种方法也没有奏效。
谁能给我指出正确的方向?
提前致谢,
GC克鲁兹
附录: 我发布了一个我想做的简单示例:
import multiprocessing
import time
def sampleprocess(array, count):
'''process with heavy image processing in a loop'''
for i in range(count)
# Do processing on this array that outputs a given variable
sample_variable= count*10 # I would like to monitor this variable
if __name__ == '__main__':
p = multiprocessing.Process(target=sampleprocess, args=(array,1000,))
p.start()
# continuously monitor the dummy variable that is being computed on the process
while sample_variable < 1000
time.sleep(0.1)
print ' Still less than 1000'
一种选择是对共享数据对象使用多处理值和数组。 https://docs.python.org/2/library/multiprocessing.html#multiprocessing-managers
您的示例代码中的一个工作示例。如果你有不止一个 进程需要一个锁来同步写入。
from multiprocessing import Process, Value, Array, Lock 导入时间
def sampleprocess(s, count, lock): '''process with heavy image processing in a loop''' 对于范围内的我(count.value): # 对输出给定变量的数组进行处理
sample_variable= count*10 # 我要监控这个变量
with lock:
s.value= i*10
if name == 'main':
val = Value('i', 1000)
sample_variable = Value('i', 0)
lock = Lock()
p = Process(target=sampleprocess, args=(sample_variable, val, lock))
p.start()
# continuously monitor the dummy variable that is being computed on the process
while sample_variable.value < 1000:
time.sleep(0.1)
print ' Still less than 1000'
这里有一些程序可以演示如何实施解决方案:
示例 1
from multiprocessing import *
import time
def main():
array, loops = list(range(1000)), 1000
variable = Value('I')
p = Process(target=sample, args=(array, loops, variable))
p.start()
while variable.value < 1000:
print('Still less than 1000')
time.sleep(0.005)
print('Must be at least 1000')
p.join()
print('Value is', variable.value)
def sample(array, loops, variable):
for number in range(loops):
variable.value = number * 10
print('Sample is done')
if __name__ == '__main__':
main()
示例 2
from multiprocessing import *
import time
def main():
processes = 10
array, loops = list(range(1000)), 1000
shared = Array('I', processes)
p_array = []
for index in range(processes):
p = Process(target=sample, args=(array, loops, shared, index))
p.start()
p_array.append(p)
while True:
less_than_1000 = [p for p in enumerate(shared[:]) if p[1] < 1000]
if less_than_1000:
print(less_than_1000)
time.sleep(0.001)
else:
break
print('No process in less than 1000')
for p in p_array:
p.join()
print(shared[:])
def sample(array, loops, p_array, index):
time.sleep(1)
for number in range(loops):
time.sleep(0.001)
p_array[index] = number * 10
print('Sample is done')
if __name__ == '__main__':
main()
示例 3
from multiprocessing import *
import time
def main():
array, loops = list(range(1000)), 1000
with Manager() as manager:
variable = manager.Value('I', 0)
p = Process(target=sample, args=(array, loops, variable))
p.start()
while variable.value < 1000:
print('Still less than 1000')
time.sleep(0.005)
print('Must be at least 1000')
p.join()
print('Value is', variable.value)
def sample(array, loops, variable):
for number in range(loops):
variable.value = number * 10
print('Sample is done')
if __name__ == '__main__':
main()
示例 4
from multiprocessing import *
import time
def main():
array, loops = list(range(1000)), 1000
event = Event()
p = Process(target=sample, args=(array, loops, event))
p.start()
event.wait()
print('Must be at least 1000')
p.join()
def sample(array, loops, event):
for number in range(loops):
if number >= 100 and not event.is_set():
event.set()
time.sleep(0.001)
print('Sample is done')
if __name__ == '__main__':
main()
如您所见,有多种方法可以完成您要完成的任务。