如何在 openpyxl 对象中使用多进程?
How to use multiprocess in openpyxl object?
我需要处理一个 excel,里面有很多 sheet,但是每个 sheet 都有很大的 data.If 使用 openpyxl 加载这个 excel ,那会花很多时间,所以我想通过多进程分析每个sheet。
像这样的简短代码:
import multiprocessing as mp
import openpyxl
def LoadEx():
wb=openpyxl.load_workbook('example.xlsx')
sheetnames=wb.get_sheet_names()
return sheetnames, wb
def job(sheet,wb):
gs=wb.get_sheet_by_name(sheet)
for i in range(10):
if gs.cell(row=i,column=2).value=='Target':
gs.cell(row=i,column=3).value='OK'
if __name__=='__main__':
sheetnames,wb=LoadEx()
pool=mp.Pool()
for sheetname in sheetnames:
res=pool.apply_async(job, (sheetname,wb))
pool.close()
pool.join()
wb.save('example_output.xlsx')
但是,文件 'example_output.xlsx' 似乎没有保存 job() 的结果,
在这种情况下我应该怎么做才能获得多进程的效果?
也许有人可以帮助我,thinks
不幸的是,工作簿不适合多处理,因为有很多共享状态。
您可以使用 multiprocessing
完成,但您必须 支付 费用。
global wb
成为您每 process
使用的 副本 。
因此使用 4 processes
,您的 内存 必须足够大以容纳 4 个工作簿副本。
鉴于wb
是一个副本,你的修改属于这个副本。
您必须将更改复制到 one 工作簿中。
复制工作表可能很耗时。
为了克服 Pickling Error,我从排队 ws
改为 wsDiff
。
不是写入 ws
副本,而是将更改汇总到 wsDiff
。
作为奖励,复制到目标 wb
会更快。
Time table: cpu_count=2, 10 Worksheets, workload: def ws_job(...
Job Processes without mp 2 4
Time 0:00:21.260746 0:00:10.214942 0:00:07.097369
这个工作示例将适合给定的问题 def job
。
,例如:
import multiprocessing as mp
import queue, os, time
import random as rd
import openpyxl
class wsDiff(object):
def __init__(self, row, column, value):
self.row = row
self.column = column
self.value = value
def ws_job(wb, ws_idx):
ws = wb.worksheets[ws_idx]
print('pid %s: process (%s)' % (os.getpid(), ws.title))
# *** DO SOME STUFF HERE***
# Simulate workload
time.sleep(rd.randrange(1, 4))
diff = []
for i in range(1, 11):
if ws.cell(row=i, column=2).value == 'Target':
#ws.cell(row=i, column=3).value = 'OK'
diff.append( wsDiff(i, 3, 'OK') )
return diff
def job(fq, q, wb):
while True:
try:
ws_idx = fq.get_nowait()
except queue.Empty:
print('pid %s: exit job' % os.getpid())
exit(0)
q.put((ws_job(wb, ws_idx), ws_idx))
time.sleep(0.1)
def writer(q, wb):
print('start writer_handler')
while True:
try:
diff, i_ws = q.get()
except ValueError:
print('writer ValueError exit(1)')
exit(1)
if diff == None:
wb.save('../test/example_output.xlsx')
exit(0)
ws = wb.worksheets[i_ws]
print('pid %s: update sheet %s from diff' % (os.getpid(), ws.title))
for d in diff:
ws.cell(row=d.row, column=d.column).value = d.value
def mpRun():
wb = openpyxl.load_workbook('../test/example.xlsx')
f_q = mp.Queue()
for i in range(len(wb.worksheets)):
f_q.put(i)
w_q = mp.Queue()
w_p = mp.Process(target=writer, args=(w_q, wb))
w_p.start()
time.sleep(0.1)
pool = [mp.Process(target=job, args=(f_q, w_q, wb)) for p in range(os.cpu_count() + 2)]
for p in pool:
p.start()
time.sleep(0.1)
for p in pool:
p.join()
time.sleep(0.2)
# Terminate Process w_p after all Sheets done
w_q.put((None, None))
w_p.join()
print('EXIT __main__')
测试 Python:3.4.2 - openpyxl:2.4.1 - LibreOffice:4.3.3.2
我需要处理一个 excel,里面有很多 sheet,但是每个 sheet 都有很大的 data.If 使用 openpyxl 加载这个 excel ,那会花很多时间,所以我想通过多进程分析每个sheet。
像这样的简短代码:
import multiprocessing as mp
import openpyxl
def LoadEx():
wb=openpyxl.load_workbook('example.xlsx')
sheetnames=wb.get_sheet_names()
return sheetnames, wb
def job(sheet,wb):
gs=wb.get_sheet_by_name(sheet)
for i in range(10):
if gs.cell(row=i,column=2).value=='Target':
gs.cell(row=i,column=3).value='OK'
if __name__=='__main__':
sheetnames,wb=LoadEx()
pool=mp.Pool()
for sheetname in sheetnames:
res=pool.apply_async(job, (sheetname,wb))
pool.close()
pool.join()
wb.save('example_output.xlsx')
但是,文件 'example_output.xlsx' 似乎没有保存 job() 的结果, 在这种情况下我应该怎么做才能获得多进程的效果? 也许有人可以帮助我,thinks
不幸的是,工作簿不适合多处理,因为有很多共享状态。
您可以使用 multiprocessing
完成,但您必须 支付 费用。
global wb
成为您每process
使用的 副本 。 因此使用 4processes
,您的 内存 必须足够大以容纳 4 个工作簿副本。鉴于
wb
是一个副本,你的修改属于这个副本。 您必须将更改复制到 one 工作簿中。 复制工作表可能很耗时。
为了克服 Pickling Error,我从排队 ws
改为 wsDiff
。
不是写入 ws
副本,而是将更改汇总到 wsDiff
。
作为奖励,复制到目标 wb
会更快。
Time table: cpu_count=2, 10 Worksheets, workload:
def ws_job(...
Job Processes without mp 2 4
Time 0:00:21.260746 0:00:10.214942 0:00:07.097369
这个工作示例将适合给定的问题 def job
。
,例如:
import multiprocessing as mp
import queue, os, time
import random as rd
import openpyxl
class wsDiff(object):
def __init__(self, row, column, value):
self.row = row
self.column = column
self.value = value
def ws_job(wb, ws_idx):
ws = wb.worksheets[ws_idx]
print('pid %s: process (%s)' % (os.getpid(), ws.title))
# *** DO SOME STUFF HERE***
# Simulate workload
time.sleep(rd.randrange(1, 4))
diff = []
for i in range(1, 11):
if ws.cell(row=i, column=2).value == 'Target':
#ws.cell(row=i, column=3).value = 'OK'
diff.append( wsDiff(i, 3, 'OK') )
return diff
def job(fq, q, wb):
while True:
try:
ws_idx = fq.get_nowait()
except queue.Empty:
print('pid %s: exit job' % os.getpid())
exit(0)
q.put((ws_job(wb, ws_idx), ws_idx))
time.sleep(0.1)
def writer(q, wb):
print('start writer_handler')
while True:
try:
diff, i_ws = q.get()
except ValueError:
print('writer ValueError exit(1)')
exit(1)
if diff == None:
wb.save('../test/example_output.xlsx')
exit(0)
ws = wb.worksheets[i_ws]
print('pid %s: update sheet %s from diff' % (os.getpid(), ws.title))
for d in diff:
ws.cell(row=d.row, column=d.column).value = d.value
def mpRun():
wb = openpyxl.load_workbook('../test/example.xlsx')
f_q = mp.Queue()
for i in range(len(wb.worksheets)):
f_q.put(i)
w_q = mp.Queue()
w_p = mp.Process(target=writer, args=(w_q, wb))
w_p.start()
time.sleep(0.1)
pool = [mp.Process(target=job, args=(f_q, w_q, wb)) for p in range(os.cpu_count() + 2)]
for p in pool:
p.start()
time.sleep(0.1)
for p in pool:
p.join()
time.sleep(0.2)
# Terminate Process w_p after all Sheets done
w_q.put((None, None))
w_p.join()
print('EXIT __main__')
测试 Python:3.4.2 - openpyxl:2.4.1 - LibreOffice:4.3.3.2