在 python 中使用多处理加速 IO

Using multiprocessing for speeding up IO in python

我有一些代码可以读取多个数据集,由(称之为)"year" 键入,然后对它们进行一些连接。我试图通过并行化问题的 "read" 部分来加速代码。我通过编写这个函数来做到这一点:

现在,这段代码确实并行启动了多个进程,并且每个进程都很快完成,但总体运行时间最终比简单地串行读取要慢。

我做错了什么?

def parallelQueueRead():                                                         
  start_time = timeit.default_timer()                                             
  myq = Queue()

  def reader(year, q):                                                            
    loc_start_time = timeit.default_timer()                                        
    print("reading year %s" % (year))                                                                                      
    astore = store(year)                                  
    df = astore.getAllData(TESTSPEC)                                               
    astore.close()                                                                 
    q.put((year, df))                                                              
    print("finished reading year %s ,took: %s" %                                   
      (year, str(timeit.default_timer() - loc_start_time)))

  processes = [Process(target = reader, args = (y, myq)) for y in CHUNKS ]        
  for p in processes:                                                             
    p.start()              

  results = [ myq.get() for p in processes ]                                      
  results = sorted(results, key = lambda x: x[0])     

  print("parallel read took: " + str(timeit.default_timer() - start_time))

输出:

reading year 2011
reading year 2012
reading year 2013
reading year 2014
reading year 2015
finished reading year 2011 ,took: 1.142295703291893
finished reading year 2014 ,took: 1.2605517469346523
finished reading year 2013 ,took: 1.2637327639386058
finished reading year 2012 ,took: 1.2874943045899272
finished reading year 2015 ,took: 1.7436037007719278
parallel read took: 5.500953913666308

仅在一个进程中连续执行相同操作的另一个例程的输出:

serial read took: 5.3680868614465

Post-脚本 1

澄清一下:串行版本是一个简单的 for 循环:

results = [] 
for year in CHUNKS:
    results += [ astore.getAllData(TESTSPEC) ]

Post-脚本 2

阅读文档后,我认为并行版本速度慢的原因是 pickle 了一个大数据集(reader 的结果)。执行此操作所花费的时间包含在每个拣货员报告的时间中(此外,取消拣选结果所花费的时间也包含在总时间中)。

这对我来说真是个坏消息,因为这意味着多处理无法加速我的代码的执行。

根据 df 中的数据结构(astore.getAllData(TESTSPEC) 的结果),您可以尝试使用 sharedctypes 将收集到的数据存储在共享内存中。当然,此方法主要对“POD”有用 - data-only 结构中没有任何代码或复杂对象。

此外,我会将整个数据处理转移给子进程,并确保 astore 实际上能够并行工作 w/o 在客户端(不同进程)之间同步(或至少最小化同步时间) .

但当然所有这些建议都是基于 'common sense' - 如果不了解您的应用程序内部结构和准确的分析,就很难准确地说出什么是最适合您的解决方案