如何在 openpyxl 对象中使用多进程?

How to use multiprocess in openpyxl object?

我需要处理一个 excel,里面有很多 sheet,但是每个 sheet 都有很大的 data.If 使用 openpyxl 加载这个 excel ,那会花很多时间,所以我想通过多进程分析每个sheet。

像这样的简短代码:

import multiprocessing as mp
import openpyxl
def LoadEx():
        wb=openpyxl.load_workbook('example.xlsx')
        sheetnames=wb.get_sheet_names()
        return sheetnames, wb

def job(sheet,wb):
    gs=wb.get_sheet_by_name(sheet)
    for i in range(10):
        if gs.cell(row=i,column=2).value=='Target':
            gs.cell(row=i,column=3).value='OK'

if __name__=='__main__':
    sheetnames,wb=LoadEx()
    pool=mp.Pool()
    for sheetname in sheetnames:
        res=pool.apply_async(job, (sheetname,wb))
    pool.close()
    pool.join()
    wb.save('example_output.xlsx')

但是,文件 'example_output.xlsx' 似乎没有保存 job() 的结果, 在这种情况下我应该怎么做才能获得多进程的效果? 也许有人可以帮助我,thinks

不幸的是,工作簿不适合多处理,因为有很多共享状态。

您可以使用 multiprocessing 完成,但您必须 支付 费用。

  1. global wb 成为您每 process 使用的 副本 。 因此使用 4 processes,您的 内存 必须足够大以容纳 4 个工作簿副本。

  2. 鉴于wb是一个副本,你的修改属于这个副本。 您必须将更改复制到 one 工作簿中。 复制工作表可能很耗时。

为了克服 Pickling Error,我从排队 ws 改为 wsDiff。 不是写入 ws 副本,而是将更改汇总到 wsDiff。 作为奖励,复制到目标 wb 会更快。

Time table: cpu_count=2, 10 Worksheets, workload: def ws_job(...

Job Processes   without mp           2               4  
Time          0:00:21.260746  0:00:10.214942  0:00:07.097369  

这个工作示例将适合给定的问题 def job。 ,例如:

import multiprocessing as mp
import queue, os, time
import random as rd
import openpyxl


class wsDiff(object):
    def __init__(self, row, column, value):
        self.row = row
        self.column = column
        self.value = value


def ws_job(wb, ws_idx):
    ws = wb.worksheets[ws_idx]
    print('pid %s: process (%s)' % (os.getpid(), ws.title))

    # *** DO SOME STUFF HERE***
    # Simulate workload
    time.sleep(rd.randrange(1, 4))

    diff = []
    for i in range(1, 11):
        if ws.cell(row=i, column=2).value == 'Target':
            #ws.cell(row=i, column=3).value = 'OK'
            diff.append( wsDiff(i, 3, 'OK') )

    return diff


def job(fq, q, wb):
    while True:
        try:
            ws_idx = fq.get_nowait()
        except queue.Empty:
            print('pid %s: exit job' % os.getpid())
            exit(0)

        q.put((ws_job(wb, ws_idx), ws_idx))
        time.sleep(0.1)


def writer(q, wb):
    print('start writer_handler')
    while True:
        try:
            diff, i_ws = q.get()
        except ValueError:
            print('writer ValueError exit(1)')
            exit(1)

        if diff == None:
            wb.save('../test/example_output.xlsx')
            exit(0)

        ws = wb.worksheets[i_ws]
        print('pid %s: update sheet %s from diff' % (os.getpid(), ws.title))
        for d in diff:
            ws.cell(row=d.row, column=d.column).value = d.value


def mpRun():
    wb = openpyxl.load_workbook('../test/example.xlsx')

    f_q = mp.Queue()
    for i in range(len(wb.worksheets)):
        f_q.put(i)

    w_q = mp.Queue()
    w_p = mp.Process(target=writer, args=(w_q, wb))
    w_p.start()
    time.sleep(0.1)

    pool = [mp.Process(target=job, args=(f_q, w_q, wb)) for p in range(os.cpu_count() + 2)]
    for p in pool:
        p.start()
        time.sleep(0.1)

    for p in pool:
        p.join()

    time.sleep(0.2)
    # Terminate Process w_p after all Sheets done
    w_q.put((None, None))
    w_p.join()

    print('EXIT __main__')

测试 Python:3.4.2 - openpyxl:2.4.1 - LibreOffice:4.3.3.2