使用 pool.map 将每个循环的 pandas 计算发送到不同的线程 (python3.6.5)

sending each looped pandas calculation to a different thread (python3.6.5) with pool.map

有了基本的 pandas 金融市场 OHLCV 数据的 df,我正在尝试向 df 添加大量计算列。大量的列和计算使得这个 SLOW SLOW SLOW! 尝试使用 pool.map 进行多进程处理,但一无所获。 理想情况下,循环的每次迭代都应发送到一个离散的线程。下面代码中的简化移动平均线。 显示简单的字典和滚动平均工作缓慢 类型错误:map() 缺少 1 个必需的位置参数:'iterable' 感谢大家的帮助-thx

import pandas as pd
from multiprocessing.dummy import Pool as ThreadPool

#####################################################
# DJIA_OHLCV_test.csv has format:
# Date,Open,High,Low,Close,Adj Close,Volume
# 
1/2/2015,17823.07031,17951.7793,17731.30078,17832.99023,17832.99023,76270000
# 
1/3/2015,17823.07031,17951.7793,17731.30078,17832.99023,17832.99023,76270000
DJIA = pd.read_csv('DJIA_OHLCV_test.csv')
"""
#####################################################
# # This works! please comment out to switch 
# MAdict = {'MA50':50, 'MA100':100, 'MA200':200} # Define Moving Average 
Windows

# for MAkey in MAdict:
#     DJIA[('ma' + MAkey)] = pd.Series.rolling(DJIA['Adj Close'], 
              window=MAdict[MAkey]).mean()

#####################################################
"""
# This doesn't work! please comment out to switch 
MAdict = {'MA50':50, 'MA100':100, 'MA200':200}
pool = ThreadPool(3)

def moving_average(MAkey):
    return pd.Series.rolling(DJIA['Adj Close'], window=MAdict[MAkey]).mean()

for MAkey in MAdict:
    DJIA[('ma' + MAkey)] = pool.map(moving_average(MAkey))

#####################################################
print(DJIA.tail())

pool.map 是一个阻塞调用,因此无需遍历 MAdict 并调用 pool.map,您需要将可迭代对象作为参数直接传递给 pool.map

import pandas as pd
from multiprocessing.dummy import Pool


def moving_average(ma):
    return pd.Series.rolling(djia['Adj Close'], window=ma).mean()


if __name__ == '__main__':

    N_WORKERS = 3
    MA_DICT = {'MA50':50, 'MA100':100, 'MA200':200}

    djia = pd.read_csv('DJIA_OHLCV_test.csv')

    with Pool(N_WORKERS) as pool:
        results = pool.map(moving_average, iterable=MA_DICT.values())

    # concatenate results and rename columns
    results = pd.concat(results, axis=1)
    results.columns = ['ma' + key for key in MA_DICT]

    djia = pd.concat([djia, results], axis=1)

    print(djia.tail())