使用多处理锁写入文件时缺少行 Python
Missing lines when writing file with multiprocessing Lock Python
这是我的代码:
from multiprocessing import Pool, Lock
from datetime import datetime as dt
console_out = "/STDOUT/Console.out"
chunksize = 50
lock = Lock()
def writer(message):
lock.acquire()
with open(console_out, 'a') as out:
out.write(message)
out.flush()
lock.release()
def conf_wrapper(state):
import ProcessingModule as procs
import sqlalchemy as sal
stcd, nrows = state
engine = sal.create_engine('postgresql://foo:bar@localhost:5432/schema')
writer("State {s} started at: {n}"
"\n".format(s=str(stcd).zfill(2), n=dt.now()))
with engine.connect() as conn, conn.begin():
procs.processor(conn, stcd, nrows, chunksize)
writer("\tState {s} finished at: {n}"
"\n".format(s=str(stcd).zfill(2), n=dt.now()))
def main():
nprocesses = 12
maxproc = 1
state_list = [(2, 113), (10, 119), (15, 84), (50, 112), (44, 110), (11, 37), (33, 197)]
with open(console_out, 'w') as out:
out.write("Starting at {n}\n".format(n=dt.now()))
out.write("Using {p} processes..."
"\n".format(p=nprocesses))
with Pool(processes=int(nprocesses), maxtasksperchild=maxproc) as pool:
pool.map(func=conf_wrapper, iterable=state_list, chunksize=1)
with open(console_out, 'a') as out:
out.write("\nAll done at {n}".format(n=dt.now()))
文件 console_out
中从来没有包含所有 7 个状态。它总是会错过一个或多个状态。这是最新 运行:
的输出
Starting at 2016-07-27 21:46:58.638587
Using 12 processes...
State 44 started at: 2016-07-27 21:47:01.482322
State 02 started at: 2016-07-27 21:47:01.497947
State 11 started at: 2016-07-27 21:47:01.529198
State 10 started at: 2016-07-27 21:47:01.497947
State 11 finished at: 2016-07-27 21:47:15.701207
State 15 finished at: 2016-07-27 21:47:24.123164
State 44 finished at: 2016-07-27 21:47:32.029489
State 50 finished at: 2016-07-27 21:47:51.203107
State 10 finished at: 2016-07-27 21:47:53.046876
State 33 finished at: 2016-07-27 21:47:58.156301
State 02 finished at: 2016-07-27 21:48:18.856979
All done at 2016-07-27 21:48:18.992277
为什么?
注意,OS 是 Windows Server 2012 R2。
由于您运行正在Windows上运行,没有任何东西被工作进程继承。每个进程运行是整个主程序"from scratch".
特别是,在编写的代码中,每个进程都有自己的lock
实例,这些实例之间没有任何关系。简而言之,lock
根本不提供任何进程间互斥。
要解决此问题,可以更改 Pool
构造函数以调用每个进程一次的初始化函数,您将 Lock()
的实例传递给该函数。比如像这样:
def init(L):
global lock
lock = L
然后将这些参数添加到 Pool()
构造函数中:
initializer=init, initargs=(Lock(),),
而且您不再需要:
lock = Lock()
行。
然后进程间互斥将按预期工作。
没有锁
如果您想将所有输出委托给写入进程,您可以跳过锁定并使用队列来代替该进程[稍后查看不同版本]。
def writer_process(q):
with open(console_out, 'w') as out:
while True:
message = q.get()
if message is None:
break
out.write(message)
out.flush() # can't guess whether you really want this
并将 writer()
更改为:
def writer(message):
q.put(message)
您将再次需要在 Pool
构造函数中使用 initializer=
和 initargs=
,以便所有进程都使用 相同的 队列。
只有一个进程应该 运行 writer_process()
,并且可以作为 multiprocessing.Process
.
的实例自行启动
最后,让 writer_process()
知道是时候退出了,当它 是 排空队列的时间并且 return 只是 运行
q.put(None)
在主进程中。
稍后
OP 选择了这个版本,因为他们需要同时用其他代码打开输出文件:
def writer_process(q):
while True:
message = q.get()
if message == 'done':
break
else:
with open(console_out, 'a') as out:
out.write(message)
不知道为什么终结哨兵改成了"done"
。任何独特的价值都适用于此; None
是传统的。
这是我的代码:
from multiprocessing import Pool, Lock
from datetime import datetime as dt
console_out = "/STDOUT/Console.out"
chunksize = 50
lock = Lock()
def writer(message):
lock.acquire()
with open(console_out, 'a') as out:
out.write(message)
out.flush()
lock.release()
def conf_wrapper(state):
import ProcessingModule as procs
import sqlalchemy as sal
stcd, nrows = state
engine = sal.create_engine('postgresql://foo:bar@localhost:5432/schema')
writer("State {s} started at: {n}"
"\n".format(s=str(stcd).zfill(2), n=dt.now()))
with engine.connect() as conn, conn.begin():
procs.processor(conn, stcd, nrows, chunksize)
writer("\tState {s} finished at: {n}"
"\n".format(s=str(stcd).zfill(2), n=dt.now()))
def main():
nprocesses = 12
maxproc = 1
state_list = [(2, 113), (10, 119), (15, 84), (50, 112), (44, 110), (11, 37), (33, 197)]
with open(console_out, 'w') as out:
out.write("Starting at {n}\n".format(n=dt.now()))
out.write("Using {p} processes..."
"\n".format(p=nprocesses))
with Pool(processes=int(nprocesses), maxtasksperchild=maxproc) as pool:
pool.map(func=conf_wrapper, iterable=state_list, chunksize=1)
with open(console_out, 'a') as out:
out.write("\nAll done at {n}".format(n=dt.now()))
文件 console_out
中从来没有包含所有 7 个状态。它总是会错过一个或多个状态。这是最新 运行:
Starting at 2016-07-27 21:46:58.638587
Using 12 processes...
State 44 started at: 2016-07-27 21:47:01.482322
State 02 started at: 2016-07-27 21:47:01.497947
State 11 started at: 2016-07-27 21:47:01.529198
State 10 started at: 2016-07-27 21:47:01.497947
State 11 finished at: 2016-07-27 21:47:15.701207
State 15 finished at: 2016-07-27 21:47:24.123164
State 44 finished at: 2016-07-27 21:47:32.029489
State 50 finished at: 2016-07-27 21:47:51.203107
State 10 finished at: 2016-07-27 21:47:53.046876
State 33 finished at: 2016-07-27 21:47:58.156301
State 02 finished at: 2016-07-27 21:48:18.856979
All done at 2016-07-27 21:48:18.992277
为什么?
注意,OS 是 Windows Server 2012 R2。
由于您运行正在Windows上运行,没有任何东西被工作进程继承。每个进程运行是整个主程序"from scratch".
特别是,在编写的代码中,每个进程都有自己的lock
实例,这些实例之间没有任何关系。简而言之,lock
根本不提供任何进程间互斥。
要解决此问题,可以更改 Pool
构造函数以调用每个进程一次的初始化函数,您将 Lock()
的实例传递给该函数。比如像这样:
def init(L):
global lock
lock = L
然后将这些参数添加到 Pool()
构造函数中:
initializer=init, initargs=(Lock(),),
而且您不再需要:
lock = Lock()
行。
然后进程间互斥将按预期工作。
没有锁
如果您想将所有输出委托给写入进程,您可以跳过锁定并使用队列来代替该进程[稍后查看不同版本]。
def writer_process(q):
with open(console_out, 'w') as out:
while True:
message = q.get()
if message is None:
break
out.write(message)
out.flush() # can't guess whether you really want this
并将 writer()
更改为:
def writer(message):
q.put(message)
您将再次需要在 Pool
构造函数中使用 initializer=
和 initargs=
,以便所有进程都使用 相同的 队列。
只有一个进程应该 运行 writer_process()
,并且可以作为 multiprocessing.Process
.
最后,让 writer_process()
知道是时候退出了,当它 是 排空队列的时间并且 return 只是 运行
q.put(None)
在主进程中。
稍后
OP 选择了这个版本,因为他们需要同时用其他代码打开输出文件:
def writer_process(q):
while True:
message = q.get()
if message == 'done':
break
else:
with open(console_out, 'a') as out:
out.write(message)
不知道为什么终结哨兵改成了"done"
。任何独特的价值都适用于此; None
是传统的。