Spider 的多个控制台比多处理更快?

Multiple consoles of Sypder are faster than mutliprocessing?

我正在 运行 对交易策略进行回测,定义为 class。我正在尝试 select 将参数的最佳组合输入到模型中,因此我 运行 在给定时间段内进行多次回测,尝试不同的组合。这个想法是为了能够 select 将第一代种群输入遗传算法。似乎是多处理的完美工作!

所以我尝试了很多东西,看看哪个更快。我打开了 10 个 Spyder 控制台(是的,我试过了)和 运行 每个控制台的单个参数组合(同时 运行ning)。

用于每个 Spyder 控制台的示例代码:

class MyStrategy(day,parameters):
   # my strategy that runs on a single day

backtesting=[]
for day in days:
   backtesting_day=MyStrategy(day,single_parameter_combi)
   backtesting.append(backtesting_day)

然后我尝试了使用池的多处理方式。

多处理中使用的示例代码:

class MyStrategy(day,parameters):
   # my strategy that runs on a single day

def single_run_backtesting(single_parameter_combi):
   backtesting=[]
   for day in days:
      backtesting_day=MyStrategy(day,single_parameter_combi)
      backtesting.append(backtesting_day)
   return backtesting

def backtest_many(list_of parameter_combinations):
   p=multiprocessing.pool()
   result=p.map(single_run_backtesting,list_of parameter_combinations)
   p.close()
   p.join()
   return result

if __name__ == '__main__':
   parameter_combis=[...] # a list of parameter combinations, 10 different ones in this case
   result = backtest_many(parameter_combis)

我还尝试了以下操作:打开 5 个 Spyder 控制台并 运行在 for 循环中 class 的 2 个实例,如下所示,以及一个带有 10 个实例的 Spyder 控制台class.

class MyStrategy(day,parameters):
   # my strategy that runs on a single day

parameter_combis=[...] # a list of parameter combinations

backtest_dict={k: [] for k in range(len(parameter_combis)} # make a dictionary of empty lists

for day in days:
   for j,single_parameter_combi in enumerate(parameter_combis):
      backtesting_day=MyStrategy(day,single_parameter_combi)
      backtest_dict[j].append(backtesting_day)

令我大吃一惊的是,多处理一天大约需要 25 分钟,而在 for 循环中有 10 个 class 实例的单个 Spyder 控制台大约需要相同的时间,并且神奇地我 运行 同时使用 10 个 Spyder 控制台只需要 15 分钟。我如何处理这些信息?这对我来说真的没有意义。我正在 运行 在 windows 10.

上安装一台 12-cpu 机器

考虑到我计划 运行 使用 96 核机器在 AWS 上做一些事情,大约 100 种参数组合交叉在遗传算法中,应该 运行 大约 20- 30 代(完整的回测是 2 个工作月 = 44 天)。

我的问题是:我错过了什么???最重要的是,这只是规模上的差异吗? 我知道,例如,如果您定义一个简单的平方函数并且 运行 它连续 100 次,那么多处理实际上比 for 循环慢。您开始看到大约 10000 次的优势,例如:https://github.com/vprusso/youtube_tutorials/blob/master/multiprocessing_and_threading/multiprocessing/multiprocessing_pool.py

当我使用多处理进行多达 100 次组合时,我是否会看到性能差异,如果是这种情况,有什么方法可以提前知道吗?我是否正确编写了代码?其他想法?如果我在一个单一的参数组合中使用多天多处理一步 "above",你认为它会显着加快速度吗?

扩展我的评论"Try p.imap_unordered().":

p.map() 确保您获得结果的顺序与它们在参数列表中的顺序相同。为了实现这一点,一些工人必须闲置一段时间 对于您的用例——本质上是参数组合的网格搜索——您真的不需要以相同的顺序排列它们,您只想以最佳选择结束。 (此外,quoth the documentation、"it may cause high memory usage for very long iterables. Consider using imap() or imap_unordered() with explicit chunksize option for better efficiency.")

p.imap_unordered(),相比之下,并不真正在意——它只是将事情排队,工作人员在空闲时处理它们。

也值得尝试使用 chunksize 参数 – 引用 imap() 文档,"For very long iterables using a large value for chunksize can make the job complete much faster than using the default value of 1."(因为您花在排队和同步事物上的时间更少)。

最后,对于您的特定用例,您可能需要考虑让主进程使用生成器函数生成无限数量的参数组合,并在找到足够好的解决方案或经过足够的时间后中断循环.

执行此操作的一个简单函数和一个人为的问题(找到两个随机数 0..1 以最大化它们的总和)如下。记住也要 return worker 函数的原始参数集,否则你将无法访问它! :)

import random
import multiprocessing
import time


def find_best(*, param_iterable, worker_func, metric_func, max_time, chunksize=10):
    best_result = None
    best_metric = None
    start_time = time.time()
    n_results = 0
    with multiprocessing.Pool() as p:
        for result in p.imap_unordered(worker_func, param_iterable, chunksize=chunksize):
            n_results += 1
            elapsed_time = time.time() - start_time
            metric = metric_func(result)
            if best_metric is None or metric > best_metric:
                print(f'{elapsed_time}: Found new best solution, metric {metric}')
                best_metric = metric
                best_result = result

            if elapsed_time >= max_time:
                print(f'{elapsed_time}: Max time reached.')
                break
    final_time = time.time() - start_time
    print(f'Searched {n_results} results in {final_time} s.')
    return best_result

# ------------

def generate_parameter():
    return {'a': random.random(), 'b': random.random()}


def generate_parameters():
    while True:
        yield generate_parameter()


def my_worker(parameters):
    return {
        'parameters': parameters,  # remember to return this too!
        'value': parameters['a'] + parameters['b'],  # our maximizable metric
    }


def my_metric(result):
    return result['value']


def main():
    result = find_best(
        param_iterable=generate_parameters(),
        worker_func=my_worker,
        metric_func=my_metric,
        max_time=5,
    )
    print(f'Best result: {result}')


if __name__ == '__main__':
    main()

一个例子运行:

~/Desktop $ python3 so59357979.py
0.022627830505371094: Found new best solution, metric 0.5126700311039976
0.022940874099731445: Found new best solution, metric 0.9464256914062249
0.022969961166381836: Found new best solution, metric 1.2946600313637404
0.02298712730407715: Found new best solution, metric 1.6255217652861256
0.023016929626464844: Found new best solution, metric 1.7041449687571075
0.02303481101989746: Found new best solution, metric 1.8898109980050104
0.030200958251953125: Found new best solution, metric 1.9031436071918972
0.030324935913085938: Found new best solution, metric 1.9321951916206537
0.03880715370178223: Found new best solution, metric 1.9410837287942249
0.03970479965209961: Found new best solution, metric 1.9649277383314245
0.07829880714416504: Found new best solution, metric 1.9926667738329622
0.6105098724365234: Found new best solution, metric 1.997217792614364
5.000051021575928: Max time reached.
Searched 621931 results in 5.07216 s.
Best result: {'parameters': {'a': 0.997483, 'b': 0.999734}, 'value': 1.997217}

(顺便说一句,这比 chunksize=1 慢了将近 6 倍。)