简单的多任务处理

Simple Multitasking

所以我有一堆功能,它们不依赖于彼此来完成它们的工作,而且每个功能都需要相当长的时间。所以我认为如果我可以使用多线程,我会在运行时安全。例如:

axial_velocity = calc_velocity(data_axial, factors_axial)
radial_velocity = calc_velocity(data_radial, factors_radial)
circumferential_velocity = calc_velocity(data_circ, factors_circ)

到目前为止我所有的变量都是列表(列表也很长)

我必须为每个输入文件执行此操作,如果超过 200 个,这将花费数小时...(我预计大约 1000+)

为了减少运行时间,我尝试尽可能少地检查计算数据(尤其是健全性检查),这有很大帮助,但下一个改进是为每组数据使用一个线程。

我试过这样的事情(过于简单化):

from multiprocessing import Pool

def calc_velocity(data, factor):
    buffer_list = []
    for index, line in enumerate(data):
        buffer_list.append(data[index] * factor[index])
    return buffer_list

data_axial = [1, 2, 3]
factors_axial = [3, 2, 1]

if __name__ == '__main__':
    p = Pool(4)
    axial_velocity = p.map(calc_velocity, args = (data_axial, factors_axial))

和:

from multiprocessing import Process


def calc_velocity(data_pack):
    data = []
    factor = []
    data.extend(data_pack[0])
    factor.extend(data_pack[1])
    buffer_list = []
    for index, line in enumerate(data):
        buffer_list.append(data[index] * factor[index])
    return buffer_list


data_axial = [1, 2, 3]
factors_axial = [3, 2, 1]

if __name__ == '__main__':
    data_pack = []
    data_pack.append(data_axial)
    data_pack.append(factors_axial)
    p = Process(target = calc_velocity, args = data_pack)
    p.start()
    p.join()
    print p

None 这些工作,但我不知道如何让它们工作。

当我想在python中进行多处理时我使用线程,下面的代码应该是在python中使用线程的示例:

from threading import Thread
import time

def time1(a, b):
    print a
    time.sleep(10)
    print time.time(), a
    return b

def time2(c, d):
    print c
    time.sleep(10)
    print time.time(), c
    return d

if __name__ == '__main__':
    # target: the function name (pointer),
    # args: a tuple of the arguments that you want to send to your function
    t1 = Thread(target = time1, args=(1, 2))
    t2 = Thread(target = time2, args=(3, 4))

    # start the functions:
    a = t1.start()
    b = t2.start()
    print a
    print b

正如你在这段代码中看到的,线程不能return一个值,所以有两种方式 return线程中的一个值,一个:你可以将输出写入一个文件,然后读取文件一个try\except块,或者你可以将一个全局值更改为你想要的值return.如果你仍然想使用多处理,你可以在这里找到一些帮助:how to get the return value from a thread in python?

希望对您有所帮助。

您的第一个示例就快完成了。但是 Pool 不使用 arg 关键字。此外,Pool.map() 只允许您将单个参数传递给函数。要传递多个参数,您必须将它们打包到另一个结构中,例如元组,就像您在第二个示例中所做的那样。

您的第一个示例的修改版本有效。

from multiprocessing import Pool

def calc_velocity(work_args):
    buffer_list = []
    for index, line in enumerate(work_args[0]):
        buffer_list.append(work_args[0][index] * work_args[1][index])
    return buffer_list

data_axial = [1, 2, 3]
factors_axial = [3, 2, 1]

if __name__ == '__main__':
    p = Pool(4)
    work_args = (data_axial, factors_axial)
    axial_velocity = p.map(calc_velocity, [work_args])

如果 calc_velocity 函数实际上代表了您的函数,那么您可以使用 numpy 的 multiply 函数使其更容易(更快)。您的 calc_velocity 函数将只是:

def calc_velocity(work_args):
    return numpy.multiply(work_args[0], work_args[1])

如果您不需要在完成后立即得到结果,一个简单的 multiprocessing.Pool.map() 就足以将您的任务分成单独的进程以并行 运行,例如:

import multiprocessing

def worker(args):  # a worker function invoked for each sub-process
    data, factor = args[0], args[1]  # Pool.map() sends a single argument so unpack them
    return [e * factor[i] for i, e in enumerate(data)]

if __name__ == "__main__":  # important process guard for cross-platform use
    calc_pool = multiprocessing.Pool(processes=3)  # we only need 3 processes
    data = (  # pack our data for multiprocessing.Pool.map() ingestion
        (data_axial, factors_axial),
        (data_radial, factors_radial),
        (data_circ, factors_circ)
    )
    # run our processes and await responses
    axial_velocity, radial_velocity, circumferential_velocity = calc_pool.map(worker, data)

但是,您问题中的相关部分暗示您有大量数据要传递 - 当 Python 使用多处理时,它不共享内存,至少在fork 的系统可以使用写时复制优化,在进程之间传递数据总是调用极其缓慢的 pickle-unpickle 例程来打包和发送数据。

因此,请确保您交换的数据量最少 - 例如,如果您从文件加载 data_axialfactors_axial,最好只发送文件路径(s ) 并让 worker() 处理 load/parse 文件本身,而不是在主进程中加载​​文件,然后发送加载的数据。

如果您需要经常(随机)访问子流程中的大量(可变)共享数据,我建议您使用一些内存数据库来完成任务,例如 Redis.