Python loops/other 可迭代对象的多处理

Python multiprocessing for loops/other iterables

我正在尝试找出一种方法来使用多处理包来减少处理我拥有的某些代码所需的时间。

基本上,我使用多个嵌套 for 循环完成了匹配计算,我想充分利用可用的 12 核处理器。我已经找到了一些关于 for 循环和多处理的文档和答案,但由于某种原因它似乎并没有在我的脑海中点击。无论如何...

我有两个大型数据框,我已将其转换为列表的列表,以便能够更轻松地迭代它们。

它们都遵循相同的格式,但具有不同的值 - 例如 DFs/lists 看起来像这样

TT 和 CT:

|user_id| hour1_avg | hour2_avg |....| hour24_avg| hour1_stdev | ... | hour24_stdev | 
|-------|-----------|-----------|----|-----------|-------------|-----|--------------|
| 12345 |   1.34    |   2.14    |----|   3.24    |    .942     |-----|      .834    | 
| 54321 |   2.14    |   3.10    |----|   6.26    |    .826     |-----|      .018    |

然后使用 .values.to_list().

将其转换为列表列表

TTL 和 CTL:

[[12345, 1.34, 2.14,...3.24,.942,....834],[54321, 2.14, 3.10,...6.26, .826,....018], [etc]]

我的代码遍历两个列表列表,计算每个小时值的计算结果,然后如果所有 24 小时都满足 [=] 中的条件,则将配对结果放入 pairs 列表中17=] 语句。那些不符合标准的人可以被踢出局。

pairs = [] #output for for loops

start_time = time.time()
for idx, a in enumerate(ttl): # iterate through primary list of list
    if idx % 12 != 0: #used to separate for 12 processors (0-11 to split processes manually)
        continue
    for b in ctl: # iterate through second list of list 
        i = 0
        tval_avg = [] # used to calculate average between computed variables in the loop
        for c in range(1,31): # iterate through hour avg and stdev 
            i += 1
            tval = np.absolute((a[c] - b[c])/np.sqrt((a[c+24]**2/31)+(b[c+24]**2/31))) 
            if math.isnan(tval) or tval > 2.04:
                break
            else:
                tval_avg.append(tval)
                if i == 24:  # checks to make sure each hour matches criteria to before being returned
                    pairs.append([a[0], b[0], a[2], a[3], np.mean(tval_avg)])
    if idx % 10 == 0 :
        print(idx) # check progress of loop
print("--- %s seconds ---" % (time.time() - start_time)) # show total time at the end

如果我在 spyder 中手动打开 12 个内核并将 0-11 分配给 if idx % 语句并 运行 它们(允许我使用更多处理器),这将起作用。我的目标是 运行 一个内核中的所有内容,使用多处理分配 12 个(或任何有效的)不同的 "jobs" - 每个处理器一个,并将结果放入单个数据帧中。这种类型的代码可能吗?如果是这样,我需要进行哪些类型的更改?

抱歉,如果这很复杂。如果需要,我很乐意进一步解释。

我在 SO 周围搜索了与我的特定问题类似的内容,但没有找到任何内容。我也无法理解多处理,以及如何将其应用于此特定场景,因此非常感谢您的帮助!

您的外循环结束 ttl。 将该循环主体中的代码移动到辅助函数中 接受 a 作为输入和 returns (tval_avg, pairs).

然后使用map重复调用那个助手。

return 元组将被序列化并发送回父进程。 您需要结合单个工蜂的结果 以获得与原始代码计算相同的结果。

或者,您可能更喜欢序列化助手的结果 到唯一命名的文件中。

这在我的大型 DF 笔记本上运行不到 1.5 分钟。不过,非多处理变体并没有慢多少。
编辑:实际上只有当阈值太高以至于没有(或很少)找到对时,这才是正确的。如果你得到很多对,ipc 开销会很大,以至于非多处理变体要快得多。至少对我来说。

我通过将过滤器从 >2.04 更改为 >20 来验证结果,这更适合我创建的统一样本。
我们的两种算法似乎都产生相同的对列表(一旦我确定了范围并删除了 idx % 12 部分)。

顺便说一句,我使用 tqdm 来可视化进度,这是一个非常方便的库。

import math

import pandas as pd
import numpy as np
import tqdm
import multiprocessing

avg_cols = [f"hour{i}_avg" for i in range(1, 25)]
stdev_cols = [f"hour{i}_stdev" for i in range(1, 25)]
columns = ["userid"] + avg_cols + stdev_cols
np.random.seed(23)
# threshod = 2.04
# rands_tt = np.random.rand(3000, 49)
# rands_ct = np.random.rand(112000, 49)
threshold = 20
rands_tt = np.random.rand(2, 49)
rands_ct = np.random.rand(10, 49)

multipliers = np.repeat([1000000, 5, 2], [1, 24, 24])[None, :]

TT = pd.DataFrame(data=rands_tt * multipliers, columns=columns)
CT = pd.DataFrame(data=rands_ct * multipliers, columns=columns)

pairs = []

tt_complete = TT.loc[:, columns].to_numpy()
ct_complete = CT.loc[:, columns].to_numpy()

avg = slice(1, 25)
stdev = slice(25, 49)
# do the **2/31 calculations only once
tt_complete[:, stdev] **= 2
tt_complete[:, stdev] /= 31

ct_complete[:, stdev] **= 2
ct_complete[:, stdev] /= 31


def find_pairs(tt_row):
    tvals = np.absolute(
        (tt_row[None, avg] - ct_complete[:, avg]) / np.sqrt(tt_row[None, stdev] + ct_complete[:, stdev])
    )

    # nan will propagate itself as max and when compared to 2.04 will return False
    valid_tval_idxs = np.where(tvals.max(axis=1) <= threshold)[0]
    mean_tvals = tvals.mean(axis=1)

    return [[tt_row[0], ct_complete[i, 0], tt_row[2], tt_row[3], mean_tvals[i]] for i in valid_tval_idxs]


# for tt_row in tqdm.tqdm(tt_complete):
#     pairs.extend(find_pairs(tt_row))


with multiprocessing.Pool(6) as pool:
    pairlist_iterable = pool.imap_unordered(find_pairs, tt_complete, chunksize=200)
    for pairlist in tqdm.tqdm(pairlist_iterable, total=len(tt_complete)):
        pairs.extend(pairlist)


ttl = TT.to_numpy().tolist()
ctl = CT.to_numpy().tolist()

pairs2 = []  # output for for loops

for idx, a in enumerate(ttl):  # iterate through primary list of list

    for b in ctl:  # iterate through second list of list
        i = 0
        tval_avg = []  # used to calculate average between computed variables in the loop
        for c in range(1, 25):  # iterate through hour avg and stdev
            i += 1
            tval = np.absolute((a[c] - b[c]) / np.sqrt((a[c + 24] ** 2 / 31) + (b[c + 24] ** 2 / 31)))
            if math.isnan(tval) or tval > threshold:
                break
            else:
                tval_avg.append(tval)
                if i == 24:  # checks to make sure each hour matches criteria to before being returned
                    pairs2.append([a[0], b[0], a[2], a[3], np.mean(tval_avg)])

print(pairs)   
print(pairs2)
print(pairs == pairs2)

输出为

100%|██████████| 2/2 [00:00<00:00, 2150.93it/s]
[[517297.88384658925, 878265.8552092713, 3.8272987969845347, 1.4119792198355636, 6.95265573421445]]
[[517297.88384658925, 878265.8552092713, 3.8272987969845347, 1.4119792198355636, 6.95265573421445]]
True