在循环内使用 Python 池

Using Python pool inside loop

我正在尝试使用大量数据进行一些计算。计算由简单的相关组成,但是,我的数据量很大,我盯着我的电脑看了 10 分钟以上,根本没有输出。

然后我尝试使用multiprocessing.Pool。这是我的代码:

from multiprocessing import Pool
from haversine import haversine

def calculateCorrelation(data_1, data_2, dist):
    """
    Fill the correlation matrix between data_1 and data_2
    :param data_1: dictionary {key : [coordinates]}
    :param data_2: dictionary {key : [coordinates]}
    :param dist: minimum distance between coordinates to be considered, in kilometers.
    :return: numpy array containing the correlation between each complaint category.
    """
    pool = Pool(processes=20)

    data_1 = collections.OrderedDict(sorted(data_1.items()))
    data_2 = collections.OrderedDict(sorted(data_2.items()))
    data_1_size = len(data_1)                                          
    data_2_size = len(data_2)

    corr = numpy.zeros((data_1_size, data_2_size))

    for index_1, key_1 in enumerate(data_1):
        for index_2, key_2 in enumerate(data_2):  # Forming pairs
            type_1 = data_1[key_1]  # List of data in data_1 of type *i*
            type_2 = data_2[key_2]  # List of data in data_2 of type *j*
            result = pool.apply_async(correlation, args=[type_1, type_2, dist])
            corr[index_1, index_2] = result.get()
    pool.close()
    pool.join()


def correlation(type_1, type_2, dist):
    in_range = 0
    for l1 in type_2:      # Coordinates of a data in data_1
        for l2 in type_2:  # Coordinates of a data in data_2
            p1 = (float(l1[0]), float(l1[1]))
            p2 = (float(l2[0]), float(l2[1]))
            if haversine(p1, p2) <= dist:  # Distance between two data of types *i* and *j*
                in_range += 1              # Number of data in data_2 inside area of data in data_1
        total = float(len(type_1) * len(type_2))
        if total != 0:
            return in_range / total  # Correlation between category *i* and *j*

corr = calculateCorrelation(permiters_per_region, complaints_per_region, 20)

但是,速度并没有提高。似乎没有进行并行处理:

因为一个线程几乎集中了所有的工作。在某一时刻,所有 Python 个工作线程都使用了 CPU 的 0.0%,而一个线程使用了 100%。

我是不是漏掉了什么?

在生成作业的循环中,您调用 apply_async 然后等待它完成,从而有效地序列化工作。您可以将结果对象添加到队列中,并在所有分派工作完成后等待(见下文),甚至可以转到 map 方法。

def calculateCorrelation(data_1, data_2, dist):
    """
    Fill the correlation matrix between data_1 and data_2
    :param data_1: dictionary {key : [coordinates]}
    :param data_2: dictionary {key : [coordinates]}
    :param dist: minimum distance between coordinates to be considered, in kilometers.
    :return: numpy array containing the correlation between each complaint category.
    """
    pool = Pool(processes=20)
    results = []

    data_1 = collections.OrderedDict(sorted(data_1.items()))
    data_2 = collections.OrderedDict(sorted(data_2.items()))
    data_1_size = len(data_1)                                          
    data_2_size = len(data_2)

    corr = numpy.zeros((data_1_size, data_2_size))

    for index_1, key_1 in enumerate(data_1):
        for index_2, key_2 in enumerate(data_2):  # Forming pairs
            type_1 = data_1[key_1]  # List of data in data_1 of type *i*
            type_2 = data_2[key_2]  # List of data in data_2 of type *j*
            result = pool.apply_async(correlation, args=[type_1, type_2, dist])
            results.append((result, index_1, index_2))
    for result, index_1, index_2 in results:
        corr[index_1, index_2] = result.get()
    pool.close()
    pool.join()