Python 多进程并行写入单个 gzip
Python multiprocessing write to single gzip in parallel
我正在尝试使用 python 将一个大的压缩文件 (.gz) 复制到另一个压缩文件 (.gz)。我将对我的代码示例中不存在的数据执行中间处理。我会 喜欢 能够使用带锁的多处理从多个进程并行写入新的 gzip,但是我在输出 gz 文件上遇到无效格式错误。
我假设这是因为锁不足以支持并行写入 gzip。由于压缩数据需要“了解”之前的数据才能正确输入存档,因此我认为 python 默认情况下无法处理此问题。我猜想每个进程都保持自己对 gzip 输出的感知,并且这种状态在第一次写入后会有所不同。
如果我在脚本中打开目标文件而不使用 gzip 那么这一切都有效。我也可以写入多个 gzip 并合并它们,但如果可能的话我更愿意避免这种情况。
这是我的源代码:
#python3.8
import gzip
from itertools import islice
from multiprocessing import Process, Queue, Lock
def reader(infile, data_queue, coordinator_queue, chunk_size):
print("Reader Started.")
while True:
data_chunk = list(islice(infile, chunk_size))
data_queue.put(data_chunk)
coordinator_queue.put('CHUNK_READ')
if not data_chunk:
coordinator_queue.put('READ_DONE')
#Process exit
break
def writer(outfile, data_queue, coordinator_queue, write_lock, ID):
print("Writer Started.")
while True:
queue_message = data_queue.get()
if (queue_message == 'DONE'):
outfile.flush()
coordinator_queue.put('WRITE_DONE')
#Process exit
break
else:
print("Writer",ID,"-","Write Lock:",write_lock)
write_lock.acquire()
print("Writer",ID,"-","Write Lock:",write_lock)
for line in queue_message:
print("Line write:",line)
outfile.write(line)
write_lock.release()
print("Writer",ID,"-","Write Lock:",write_lock)
def coordinator(reader_procs, writer_procs, coordinator_queue, data_queue):
print("Coordinator Started.")
active_readers=reader_procs
active_writers=writer_procs
while True:
queue_message = coordinator_queue.get()
if queue_message=='READ_DONE':
active_readers = active_readers-1
if active_readers == 0:
while not data_queue.qsize() == 0:
continue
[data_queue.put('DONE') for x in range(writer_procs)]
if queue_message=='WRITE_DONE':
active_writers = active_writers-1
if active_writers == 0:
break
def main():
reader_procs=1
writer_procs=2
chunk_size=1
queue_size=96
data_queue = Queue(queue_size)
coordinator_queue=Queue()
write_lock=Lock()
infile_path='/directory/input_records.json.gz'
infile = gzip.open(infile_path, 'rt')
outfile_path='/directory/output_records.json.gz'
outfile = gzip.open(outfile_path, 'wt')
#Works when it is uncompressed
#outfile=open(outfile_path, 'w')
readers = [Process(target=reader, args=(infile, data_queue, coordinator_queue, chunk_size)) for x in range(reader_procs)]
writers = [Process(target=writer, args=(outfile, data_queue, coordinator_queue, write_lock, x)) for x in range(writer_procs)]
coordinator_p = Process(target=coordinator, args=(reader_procs, writer_procs, coordinator_queue, data_queue))
coordinator_p.start()
for process in readers:
process.start()
for process in writers:
process.start()
for process in readers:
process.join()
for process in writers:
process.join()
coordinator_p.join()
outfile.flush()
outfile.close()
main()
代码注意事项:
- “块大小”决定了从输入文件中提取的行数
- 我一直在使用一个小得多的测试文件来尝试完成这项工作
- 我的输入文件在未压缩时超过 200gb
- 我的输出文件在未压缩时将超过 200gb
- 这个版本的代码已经过裁剪,可能会有一些错误,但它是直接基于我的运行。
- 脚本的所有功能区域都已保留。
我猜我需要一个库来协调不同进程之间的压缩写入。显然,这说明使用单个进程来执行写入(如协调进程),但这可能会引入瓶颈。
那里有 some related posts on stack but none that seem to specifically address what I am trying to do. I also see utilities like "mgzip", "pigz" and "migz" 可以并行压缩,但我认为它们不适用于此用例。mgzip 在我的测试中不起作用(0 大小的文件),pigz 出现在命令行上使用整个文件作为输入,而 migz 是一个 java 库,所以我不确定如何将它集成到 python.
如果无法完成,那就这样吧,但如有任何答复,我们将不胜感激!
------
更新和工作代码:
在 Mark Adler 的帮助下,我能够创建一个多处理脚本来并行压缩数据,并有一个写入进程将其添加到目标 gz 文件。借助现代 NVME 驱动器的吞吐量,这降低了在成为 I/O 约束之前成为 CPU 受压缩约束的可能性。
为使此代码正常工作需要进行的最大更改如下:
需要 gzip.compress(bytes(string, 'utf-8'),compresslevel=9)
来压缩单个“块”或“流:
需要 file = open(outfile, 'wb')
才能打开可以成为目标 gzip 的未编码二进制输出文件。
file.write()
操作必须从单个进程发生,因为它必须连续执行。
值得注意的是,这个不会并行写入文件,而是并行处理压缩。无论如何,压缩是这个过程中最繁重的部分。
更新的代码(经过测试并按原样工作):
#python3.8
import gzip
from itertools import islice
from multiprocessing import Process, Queue
def reader(infile, data_queue, coordinator_queue, chunk_size):
print("Reader Started.")
while True:
data_chunk = list(islice(infile, chunk_size))
data_queue.put(data_chunk)
coordinator_queue.put('CHUNK_READ')
if not data_chunk:
coordinator_queue.put('READ_DONE')
#Process exit
break
def compressor(data_queue, compressed_queue, coordinator_queue):
print("Compressor Started.")
while True:
chunk = ''
queue_message = data_queue.get()
if (queue_message == 'DONE'):
#Notify coordinator process of task completion
coordinator_queue.put('COMPRESS_DONE')
#Process exit
break
else:
for line in queue_message:
#Assemble concatenated string from list
chunk += line
#Encode the string as binary so that it can be compressed
#Setting gzip compression level to 9 (highest)
compressed_chunk=gzip.compress(bytes(chunk,'utf-8'),compresslevel=9)
compressed_queue.put(compressed_chunk)
def writer(outfile, compressed_queue, coordinator_queue):
print("Writer Started.")
while True:
queue_message = compressed_queue.get()
if (queue_message == 'DONE'):
#Notify coordinator process of task completion
coordinator_queue.put('WRITE_DONE')
#Process exit
break
else:
outfile.write(queue_message)
def coordinator(reader_procs, writer_procs, compressor_procs, coordinator_queue, data_queue, compressed_queue):
print("Coordinator Started.")
active_readers=reader_procs
active_compressors=compressor_procs
active_writers=writer_procs
while True:
queue_message = coordinator_queue.get()
if queue_message=='READ_DONE':
active_readers = active_readers-1
if active_readers == 0:
while not data_queue.qsize() == 0:
continue
[data_queue.put('DONE') for x in range(compressor_procs)]
if queue_message=='COMPRESS_DONE':
active_compressors = active_compressors-1
if active_compressors == 0:
while not compressed_queue.qsize() == 0:
continue
[compressed_queue.put('DONE') for x in range(writer_procs)]
if queue_message=='WRITE_DONE':
active_writers = active_writers-1
if active_writers == 0:
break
def main():
reader_procs=1
compressor_procs=2
#writer_procs really needs to stay as 1 since writing must be done serially
#This could probably be written out...
writer_procs=1
chunk_size=600
queue_size=96
data_queue = Queue(queue_size)
compressed_queue=Queue(queue_size)
coordinator_queue=Queue()
infile_path='/directory/input_records.json.gz'
infile = gzip.open(infile_path, 'rt')
outfile_path='/directory/output_records.json.gz'
outfile=open(outfile_path, 'wb')
readers = [Process(target=reader, args=(infile, data_queue, coordinator_queue, chunk_size)) for x in range(reader_procs)]
compressors = [Process(target=compressor, args=(data_queue, compressed_queue, coordinator_queue)) for x in range(compressor_procs)]
writers = [Process(target=writer, args=(outfile, compressed_queue, coordinator_queue)) for x in range(writer_procs)]
coordinator_p = Process(target=coordinator, args=(reader_procs, writer_procs, compressor_procs, coordinator_queue, data_queue, compressed_queue))
coordinator_p.start()
for process in readers:
process.start()
for process in compressors:
process.start()
for process in writers:
process.start()
for process in compressors:
process.join()
for process in readers:
process.join()
for process in writers:
process.join()
coordinator_p.join()
outfile.flush()
outfile.close()
main()
将每个线程的完整 gzip 流写入单个输出文件实际上非常简单。是的,您将需要一个线程来完成所有的写入工作,每个压缩线程轮流写入其 gzip 流的 all,然后另一个压缩线程开始写入任何内容。压缩线程都可以并行压缩,但是写入需要串行化。
之所以可行,是因为 gzip 标准 RFC 1952 表示 gzip 文件由一系列成员组成,其中每个成员都是一个 gzip header、压缩数据和 gzip预告片。
我正在尝试使用 python 将一个大的压缩文件 (.gz) 复制到另一个压缩文件 (.gz)。我将对我的代码示例中不存在的数据执行中间处理。我会 喜欢 能够使用带锁的多处理从多个进程并行写入新的 gzip,但是我在输出 gz 文件上遇到无效格式错误。
我假设这是因为锁不足以支持并行写入 gzip。由于压缩数据需要“了解”之前的数据才能正确输入存档,因此我认为 python 默认情况下无法处理此问题。我猜想每个进程都保持自己对 gzip 输出的感知,并且这种状态在第一次写入后会有所不同。
如果我在脚本中打开目标文件而不使用 gzip 那么这一切都有效。我也可以写入多个 gzip 并合并它们,但如果可能的话我更愿意避免这种情况。
这是我的源代码:
#python3.8
import gzip
from itertools import islice
from multiprocessing import Process, Queue, Lock
def reader(infile, data_queue, coordinator_queue, chunk_size):
print("Reader Started.")
while True:
data_chunk = list(islice(infile, chunk_size))
data_queue.put(data_chunk)
coordinator_queue.put('CHUNK_READ')
if not data_chunk:
coordinator_queue.put('READ_DONE')
#Process exit
break
def writer(outfile, data_queue, coordinator_queue, write_lock, ID):
print("Writer Started.")
while True:
queue_message = data_queue.get()
if (queue_message == 'DONE'):
outfile.flush()
coordinator_queue.put('WRITE_DONE')
#Process exit
break
else:
print("Writer",ID,"-","Write Lock:",write_lock)
write_lock.acquire()
print("Writer",ID,"-","Write Lock:",write_lock)
for line in queue_message:
print("Line write:",line)
outfile.write(line)
write_lock.release()
print("Writer",ID,"-","Write Lock:",write_lock)
def coordinator(reader_procs, writer_procs, coordinator_queue, data_queue):
print("Coordinator Started.")
active_readers=reader_procs
active_writers=writer_procs
while True:
queue_message = coordinator_queue.get()
if queue_message=='READ_DONE':
active_readers = active_readers-1
if active_readers == 0:
while not data_queue.qsize() == 0:
continue
[data_queue.put('DONE') for x in range(writer_procs)]
if queue_message=='WRITE_DONE':
active_writers = active_writers-1
if active_writers == 0:
break
def main():
reader_procs=1
writer_procs=2
chunk_size=1
queue_size=96
data_queue = Queue(queue_size)
coordinator_queue=Queue()
write_lock=Lock()
infile_path='/directory/input_records.json.gz'
infile = gzip.open(infile_path, 'rt')
outfile_path='/directory/output_records.json.gz'
outfile = gzip.open(outfile_path, 'wt')
#Works when it is uncompressed
#outfile=open(outfile_path, 'w')
readers = [Process(target=reader, args=(infile, data_queue, coordinator_queue, chunk_size)) for x in range(reader_procs)]
writers = [Process(target=writer, args=(outfile, data_queue, coordinator_queue, write_lock, x)) for x in range(writer_procs)]
coordinator_p = Process(target=coordinator, args=(reader_procs, writer_procs, coordinator_queue, data_queue))
coordinator_p.start()
for process in readers:
process.start()
for process in writers:
process.start()
for process in readers:
process.join()
for process in writers:
process.join()
coordinator_p.join()
outfile.flush()
outfile.close()
main()
代码注意事项:
- “块大小”决定了从输入文件中提取的行数
- 我一直在使用一个小得多的测试文件来尝试完成这项工作
- 我的输入文件在未压缩时超过 200gb
- 我的输出文件在未压缩时将超过 200gb
- 这个版本的代码已经过裁剪,可能会有一些错误,但它是直接基于我的运行。
- 脚本的所有功能区域都已保留。
我猜我需要一个库来协调不同进程之间的压缩写入。显然,这说明使用单个进程来执行写入(如协调进程),但这可能会引入瓶颈。
那里有 some related posts on stack but none that seem to specifically address what I am trying to do. I also see utilities like "mgzip", "pigz" and "migz" 可以并行压缩,但我认为它们不适用于此用例。mgzip 在我的测试中不起作用(0 大小的文件),pigz 出现在命令行上使用整个文件作为输入,而 migz 是一个 java 库,所以我不确定如何将它集成到 python.
如果无法完成,那就这样吧,但如有任何答复,我们将不胜感激!
------ 更新和工作代码:
在 Mark Adler 的帮助下,我能够创建一个多处理脚本来并行压缩数据,并有一个写入进程将其添加到目标 gz 文件。借助现代 NVME 驱动器的吞吐量,这降低了在成为 I/O 约束之前成为 CPU 受压缩约束的可能性。
为使此代码正常工作需要进行的最大更改如下:
-
需要
gzip.compress(bytes(string, 'utf-8'),compresslevel=9)
来压缩单个“块”或“流:
需要 file = open(outfile, 'wb')
才能打开可以成为目标 gzip 的未编码二进制输出文件。file.write()
操作必须从单个进程发生,因为它必须连续执行。
值得注意的是,这个不会并行写入文件,而是并行处理压缩。无论如何,压缩是这个过程中最繁重的部分。
更新的代码(经过测试并按原样工作):
#python3.8
import gzip
from itertools import islice
from multiprocessing import Process, Queue
def reader(infile, data_queue, coordinator_queue, chunk_size):
print("Reader Started.")
while True:
data_chunk = list(islice(infile, chunk_size))
data_queue.put(data_chunk)
coordinator_queue.put('CHUNK_READ')
if not data_chunk:
coordinator_queue.put('READ_DONE')
#Process exit
break
def compressor(data_queue, compressed_queue, coordinator_queue):
print("Compressor Started.")
while True:
chunk = ''
queue_message = data_queue.get()
if (queue_message == 'DONE'):
#Notify coordinator process of task completion
coordinator_queue.put('COMPRESS_DONE')
#Process exit
break
else:
for line in queue_message:
#Assemble concatenated string from list
chunk += line
#Encode the string as binary so that it can be compressed
#Setting gzip compression level to 9 (highest)
compressed_chunk=gzip.compress(bytes(chunk,'utf-8'),compresslevel=9)
compressed_queue.put(compressed_chunk)
def writer(outfile, compressed_queue, coordinator_queue):
print("Writer Started.")
while True:
queue_message = compressed_queue.get()
if (queue_message == 'DONE'):
#Notify coordinator process of task completion
coordinator_queue.put('WRITE_DONE')
#Process exit
break
else:
outfile.write(queue_message)
def coordinator(reader_procs, writer_procs, compressor_procs, coordinator_queue, data_queue, compressed_queue):
print("Coordinator Started.")
active_readers=reader_procs
active_compressors=compressor_procs
active_writers=writer_procs
while True:
queue_message = coordinator_queue.get()
if queue_message=='READ_DONE':
active_readers = active_readers-1
if active_readers == 0:
while not data_queue.qsize() == 0:
continue
[data_queue.put('DONE') for x in range(compressor_procs)]
if queue_message=='COMPRESS_DONE':
active_compressors = active_compressors-1
if active_compressors == 0:
while not compressed_queue.qsize() == 0:
continue
[compressed_queue.put('DONE') for x in range(writer_procs)]
if queue_message=='WRITE_DONE':
active_writers = active_writers-1
if active_writers == 0:
break
def main():
reader_procs=1
compressor_procs=2
#writer_procs really needs to stay as 1 since writing must be done serially
#This could probably be written out...
writer_procs=1
chunk_size=600
queue_size=96
data_queue = Queue(queue_size)
compressed_queue=Queue(queue_size)
coordinator_queue=Queue()
infile_path='/directory/input_records.json.gz'
infile = gzip.open(infile_path, 'rt')
outfile_path='/directory/output_records.json.gz'
outfile=open(outfile_path, 'wb')
readers = [Process(target=reader, args=(infile, data_queue, coordinator_queue, chunk_size)) for x in range(reader_procs)]
compressors = [Process(target=compressor, args=(data_queue, compressed_queue, coordinator_queue)) for x in range(compressor_procs)]
writers = [Process(target=writer, args=(outfile, compressed_queue, coordinator_queue)) for x in range(writer_procs)]
coordinator_p = Process(target=coordinator, args=(reader_procs, writer_procs, compressor_procs, coordinator_queue, data_queue, compressed_queue))
coordinator_p.start()
for process in readers:
process.start()
for process in compressors:
process.start()
for process in writers:
process.start()
for process in compressors:
process.join()
for process in readers:
process.join()
for process in writers:
process.join()
coordinator_p.join()
outfile.flush()
outfile.close()
main()
将每个线程的完整 gzip 流写入单个输出文件实际上非常简单。是的,您将需要一个线程来完成所有的写入工作,每个压缩线程轮流写入其 gzip 流的 all,然后另一个压缩线程开始写入任何内容。压缩线程都可以并行压缩,但是写入需要串行化。
之所以可行,是因为 gzip 标准 RFC 1952 表示 gzip 文件由一系列成员组成,其中每个成员都是一个 gzip header、压缩数据和 gzip预告片。