简单的多任务处理
Simple Multitasking
所以我有一堆功能,它们不依赖于彼此来完成它们的工作,而且每个功能都需要相当长的时间。所以我认为如果我可以使用多线程,我会在运行时安全。例如:
axial_velocity = calc_velocity(data_axial, factors_axial)
radial_velocity = calc_velocity(data_radial, factors_radial)
circumferential_velocity = calc_velocity(data_circ, factors_circ)
到目前为止我所有的变量都是列表(列表也很长)
我必须为每个输入文件执行此操作,如果超过 200 个,这将花费数小时...(我预计大约 1000+)
为了减少运行时间,我尝试尽可能少地检查计算数据(尤其是健全性检查),这有很大帮助,但下一个改进是为每组数据使用一个线程。
我试过这样的事情(过于简单化):
from multiprocessing import Pool
def calc_velocity(data, factor):
buffer_list = []
for index, line in enumerate(data):
buffer_list.append(data[index] * factor[index])
return buffer_list
data_axial = [1, 2, 3]
factors_axial = [3, 2, 1]
if __name__ == '__main__':
p = Pool(4)
axial_velocity = p.map(calc_velocity, args = (data_axial, factors_axial))
和:
from multiprocessing import Process
def calc_velocity(data_pack):
data = []
factor = []
data.extend(data_pack[0])
factor.extend(data_pack[1])
buffer_list = []
for index, line in enumerate(data):
buffer_list.append(data[index] * factor[index])
return buffer_list
data_axial = [1, 2, 3]
factors_axial = [3, 2, 1]
if __name__ == '__main__':
data_pack = []
data_pack.append(data_axial)
data_pack.append(factors_axial)
p = Process(target = calc_velocity, args = data_pack)
p.start()
p.join()
print p
None 这些工作,但我不知道如何让它们工作。
当我想在python中进行多处理时我使用线程,下面的代码应该是在python中使用线程的示例:
from threading import Thread
import time
def time1(a, b):
print a
time.sleep(10)
print time.time(), a
return b
def time2(c, d):
print c
time.sleep(10)
print time.time(), c
return d
if __name__ == '__main__':
# target: the function name (pointer),
# args: a tuple of the arguments that you want to send to your function
t1 = Thread(target = time1, args=(1, 2))
t2 = Thread(target = time2, args=(3, 4))
# start the functions:
a = t1.start()
b = t2.start()
print a
print b
正如你在这段代码中看到的,线程不能return一个值,所以有两种方式
return线程中的一个值,一个:你可以将输出写入一个文件,然后读取文件一个try\except块,或者你可以将一个全局值更改为你想要的值return.如果你仍然想使用多处理,你可以在这里找到一些帮助:how to get the return value from a thread in python?
希望对您有所帮助。
您的第一个示例就快完成了。但是 Pool
不使用 arg
关键字。此外,Pool.map()
只允许您将单个参数传递给函数。要传递多个参数,您必须将它们打包到另一个结构中,例如元组,就像您在第二个示例中所做的那样。
您的第一个示例的修改版本有效。
from multiprocessing import Pool
def calc_velocity(work_args):
buffer_list = []
for index, line in enumerate(work_args[0]):
buffer_list.append(work_args[0][index] * work_args[1][index])
return buffer_list
data_axial = [1, 2, 3]
factors_axial = [3, 2, 1]
if __name__ == '__main__':
p = Pool(4)
work_args = (data_axial, factors_axial)
axial_velocity = p.map(calc_velocity, [work_args])
如果 calc_velocity
函数实际上代表了您的函数,那么您可以使用 numpy 的 multiply
函数使其更容易(更快)。您的 calc_velocity
函数将只是:
def calc_velocity(work_args):
return numpy.multiply(work_args[0], work_args[1])
如果您不需要在完成后立即得到结果,一个简单的 multiprocessing.Pool.map()
就足以将您的任务分成单独的进程以并行 运行,例如:
import multiprocessing
def worker(args): # a worker function invoked for each sub-process
data, factor = args[0], args[1] # Pool.map() sends a single argument so unpack them
return [e * factor[i] for i, e in enumerate(data)]
if __name__ == "__main__": # important process guard for cross-platform use
calc_pool = multiprocessing.Pool(processes=3) # we only need 3 processes
data = ( # pack our data for multiprocessing.Pool.map() ingestion
(data_axial, factors_axial),
(data_radial, factors_radial),
(data_circ, factors_circ)
)
# run our processes and await responses
axial_velocity, radial_velocity, circumferential_velocity = calc_pool.map(worker, data)
但是,您问题中的相关部分暗示您有大量数据要传递 - 当 Python 使用多处理时,它不共享内存,至少在fork
的系统可以使用写时复制优化,在进程之间传递数据总是调用极其缓慢的 pickle-unpickle 例程来打包和发送数据。
因此,请确保您交换的数据量最少 - 例如,如果您从文件加载 data_axial
和 factors_axial
,最好只发送文件路径(s ) 并让 worker()
处理 load/parse 文件本身,而不是在主进程中加载文件,然后发送加载的数据。
如果您需要经常(随机)访问子流程中的大量(可变)共享数据,我建议您使用一些内存数据库来完成任务,例如 Redis.
所以我有一堆功能,它们不依赖于彼此来完成它们的工作,而且每个功能都需要相当长的时间。所以我认为如果我可以使用多线程,我会在运行时安全。例如:
axial_velocity = calc_velocity(data_axial, factors_axial)
radial_velocity = calc_velocity(data_radial, factors_radial)
circumferential_velocity = calc_velocity(data_circ, factors_circ)
到目前为止我所有的变量都是列表(列表也很长)
我必须为每个输入文件执行此操作,如果超过 200 个,这将花费数小时...(我预计大约 1000+)
为了减少运行时间,我尝试尽可能少地检查计算数据(尤其是健全性检查),这有很大帮助,但下一个改进是为每组数据使用一个线程。
我试过这样的事情(过于简单化):
from multiprocessing import Pool
def calc_velocity(data, factor):
buffer_list = []
for index, line in enumerate(data):
buffer_list.append(data[index] * factor[index])
return buffer_list
data_axial = [1, 2, 3]
factors_axial = [3, 2, 1]
if __name__ == '__main__':
p = Pool(4)
axial_velocity = p.map(calc_velocity, args = (data_axial, factors_axial))
和:
from multiprocessing import Process
def calc_velocity(data_pack):
data = []
factor = []
data.extend(data_pack[0])
factor.extend(data_pack[1])
buffer_list = []
for index, line in enumerate(data):
buffer_list.append(data[index] * factor[index])
return buffer_list
data_axial = [1, 2, 3]
factors_axial = [3, 2, 1]
if __name__ == '__main__':
data_pack = []
data_pack.append(data_axial)
data_pack.append(factors_axial)
p = Process(target = calc_velocity, args = data_pack)
p.start()
p.join()
print p
None 这些工作,但我不知道如何让它们工作。
当我想在python中进行多处理时我使用线程,下面的代码应该是在python中使用线程的示例:
from threading import Thread
import time
def time1(a, b):
print a
time.sleep(10)
print time.time(), a
return b
def time2(c, d):
print c
time.sleep(10)
print time.time(), c
return d
if __name__ == '__main__':
# target: the function name (pointer),
# args: a tuple of the arguments that you want to send to your function
t1 = Thread(target = time1, args=(1, 2))
t2 = Thread(target = time2, args=(3, 4))
# start the functions:
a = t1.start()
b = t2.start()
print a
print b
正如你在这段代码中看到的,线程不能return一个值,所以有两种方式 return线程中的一个值,一个:你可以将输出写入一个文件,然后读取文件一个try\except块,或者你可以将一个全局值更改为你想要的值return.如果你仍然想使用多处理,你可以在这里找到一些帮助:how to get the return value from a thread in python?
希望对您有所帮助。
您的第一个示例就快完成了。但是 Pool
不使用 arg
关键字。此外,Pool.map()
只允许您将单个参数传递给函数。要传递多个参数,您必须将它们打包到另一个结构中,例如元组,就像您在第二个示例中所做的那样。
您的第一个示例的修改版本有效。
from multiprocessing import Pool
def calc_velocity(work_args):
buffer_list = []
for index, line in enumerate(work_args[0]):
buffer_list.append(work_args[0][index] * work_args[1][index])
return buffer_list
data_axial = [1, 2, 3]
factors_axial = [3, 2, 1]
if __name__ == '__main__':
p = Pool(4)
work_args = (data_axial, factors_axial)
axial_velocity = p.map(calc_velocity, [work_args])
如果 calc_velocity
函数实际上代表了您的函数,那么您可以使用 numpy 的 multiply
函数使其更容易(更快)。您的 calc_velocity
函数将只是:
def calc_velocity(work_args):
return numpy.multiply(work_args[0], work_args[1])
如果您不需要在完成后立即得到结果,一个简单的 multiprocessing.Pool.map()
就足以将您的任务分成单独的进程以并行 运行,例如:
import multiprocessing
def worker(args): # a worker function invoked for each sub-process
data, factor = args[0], args[1] # Pool.map() sends a single argument so unpack them
return [e * factor[i] for i, e in enumerate(data)]
if __name__ == "__main__": # important process guard for cross-platform use
calc_pool = multiprocessing.Pool(processes=3) # we only need 3 processes
data = ( # pack our data for multiprocessing.Pool.map() ingestion
(data_axial, factors_axial),
(data_radial, factors_radial),
(data_circ, factors_circ)
)
# run our processes and await responses
axial_velocity, radial_velocity, circumferential_velocity = calc_pool.map(worker, data)
但是,您问题中的相关部分暗示您有大量数据要传递 - 当 Python 使用多处理时,它不共享内存,至少在fork
的系统可以使用写时复制优化,在进程之间传递数据总是调用极其缓慢的 pickle-unpickle 例程来打包和发送数据。
因此,请确保您交换的数据量最少 - 例如,如果您从文件加载 data_axial
和 factors_axial
,最好只发送文件路径(s ) 并让 worker()
处理 load/parse 文件本身,而不是在主进程中加载文件,然后发送加载的数据。
如果您需要经常(随机)访问子流程中的大量(可变)共享数据,我建议您使用一些内存数据库来完成任务,例如 Redis.