Python 多进程并行写入单个 gzip

Python multiprocessing write to single gzip in parallel

我正在尝试使用 python 将一个大的压缩文件 (.gz) 复制到另一个压缩文件 (.gz)。我将对我的代码示例中不存在的数据执行中间处理。我会 喜欢 能够使用带锁的多处理从多个进程并行写入新的 gzip,但是我在输出 gz 文件上遇到无效格式错误。

我假设这是因为锁不足以支持并行写入 gzip。由于压缩数据需要“了解”之前的数据才能正确输入存档,因此我认为 python 默认情况下无法处理此问题。我猜想每个进程都保持自己对 gzip 输出的感知,并且这种状态在第一次写入后会有所不同。

如果我在脚本中打开目标文件而不使用 gzip 那么这一切都有效。我也可以写入多个 gzip 并合并它们,但如果可能的话我更愿意避免这种情况。

这是我的源代码:

#python3.8
import gzip
from itertools import islice
from multiprocessing import Process, Queue, Lock

def reader(infile, data_queue, coordinator_queue, chunk_size):
    print("Reader Started.")
    while True:
        data_chunk = list(islice(infile, chunk_size))
        data_queue.put(data_chunk)
        coordinator_queue.put('CHUNK_READ')
        if not data_chunk:
            coordinator_queue.put('READ_DONE')
            #Process exit
            break
        
def writer(outfile, data_queue, coordinator_queue, write_lock, ID):
    print("Writer Started.")
    while True:
        queue_message = data_queue.get()
        if (queue_message == 'DONE'):
            outfile.flush() 
            coordinator_queue.put('WRITE_DONE')
            #Process exit
            break
        else:
            print("Writer",ID,"-","Write Lock:",write_lock)
            write_lock.acquire()
            print("Writer",ID,"-","Write Lock:",write_lock)
            for line in queue_message:
                print("Line write:",line)
                outfile.write(line)
            write_lock.release()
            print("Writer",ID,"-","Write Lock:",write_lock)

def coordinator(reader_procs, writer_procs, coordinator_queue, data_queue):
    print("Coordinator Started.")
    active_readers=reader_procs
    active_writers=writer_procs
    while True:
        queue_message = coordinator_queue.get()
        if queue_message=='READ_DONE':
            active_readers = active_readers-1
            if active_readers == 0:
                while not data_queue.qsize() == 0:
                    continue
                [data_queue.put('DONE') for x in range(writer_procs)]
        if queue_message=='WRITE_DONE':
            active_writers = active_writers-1
            if active_writers == 0:
                break

def main():
    reader_procs=1
    writer_procs=2
    chunk_size=1
    queue_size=96
    data_queue = Queue(queue_size)
    coordinator_queue=Queue()
    write_lock=Lock()
    infile_path='/directory/input_records.json.gz'
    infile = gzip.open(infile_path, 'rt')
    outfile_path='/directory/output_records.json.gz'
    outfile = gzip.open(outfile_path, 'wt')
    #Works when it is uncompressed
    #outfile=open(outfile_path, 'w')
    readers = [Process(target=reader, args=(infile, data_queue, coordinator_queue, chunk_size)) for x in range(reader_procs)]
    writers = [Process(target=writer, args=(outfile, data_queue, coordinator_queue, write_lock, x)) for x in range(writer_procs)]   
    coordinator_p = Process(target=coordinator, args=(reader_procs, writer_procs, coordinator_queue, data_queue))

    coordinator_p.start()
    for process in readers:
        process.start()
    for process in writers:
        process.start()
    for process in readers:
        process.join()
    for process in writers:
        process.join()
    coordinator_p.join()
    outfile.flush()
    outfile.close()

main()

代码注意事项:

我猜我需要一个库来协调不同进程之间的压缩写入。显然,这说明使用单个进程来执行写入(如协调进程),但这可能会引入瓶颈。

那里有 some related posts on stack but none that seem to specifically address what I am trying to do. I also see utilities like "mgzip", "pigz" and "migz" 可以并行压缩,但我认为它们不适用于此用例。mgzip 在我的测试中不起作用(0 大小的文件),pigz 出现在命令行上使用整个文件作为输入,而 migz 是一个 java 库,所以我不确定如何将它集成到 python.

如果无法完成,那就这样吧,但如有任何答复,我们将不胜感激!

------ 更新和工作代码:

在 Mark Adler 的帮助下,我能够创建一个多处理脚本来并行压缩数据,并有一个写入进程将其添加到目标 gz 文件。借助现代 NVME 驱动器的吞吐量,这降低了在成为 I/O 约束之前成为 CPU 受压缩约束的可能性。

为使此代码正常工作需要进行的最大更改如下:

值得注意的是,这个不会并行写入文件,而是并行处理压缩。无论如何,压缩是这个过程中最繁重的部分。

更新的代码(经过测试并按原样工作):

#python3.8
import gzip
from itertools import islice
from multiprocessing import Process, Queue

def reader(infile, data_queue, coordinator_queue, chunk_size):
    print("Reader Started.")
    while True:
        data_chunk = list(islice(infile, chunk_size))
        data_queue.put(data_chunk)
        coordinator_queue.put('CHUNK_READ')
        if not data_chunk:
            coordinator_queue.put('READ_DONE')
            #Process exit
            break

def compressor(data_queue, compressed_queue, coordinator_queue):
    print("Compressor Started.")
    while True:
        chunk = ''
        queue_message = data_queue.get()
        if (queue_message == 'DONE'):
            #Notify coordinator process of task completion      
            coordinator_queue.put('COMPRESS_DONE')
            #Process exit
            break
        else:
            for line in queue_message:
                #Assemble concatenated string from list
                chunk += line
            #Encode the string as binary so that it can be compressed
            #Setting gzip compression level to 9 (highest)
            compressed_chunk=gzip.compress(bytes(chunk,'utf-8'),compresslevel=9)            
            compressed_queue.put(compressed_chunk)

def writer(outfile, compressed_queue, coordinator_queue):
    print("Writer Started.")
    while True:
        queue_message = compressed_queue.get()
        if (queue_message == 'DONE'):
            #Notify coordinator process of task completion      
            coordinator_queue.put('WRITE_DONE')
            #Process exit
            break
        else:
            outfile.write(queue_message)

def coordinator(reader_procs, writer_procs, compressor_procs, coordinator_queue, data_queue, compressed_queue):
    print("Coordinator Started.")
    active_readers=reader_procs
    active_compressors=compressor_procs
    active_writers=writer_procs
    while True:
        queue_message = coordinator_queue.get()
        if queue_message=='READ_DONE':
            active_readers = active_readers-1
            if active_readers == 0:
                while not data_queue.qsize() == 0:
                    continue
                [data_queue.put('DONE') for x in range(compressor_procs)]
        if queue_message=='COMPRESS_DONE':
            active_compressors = active_compressors-1
            if active_compressors == 0:
                while not compressed_queue.qsize() == 0:
                    continue
                [compressed_queue.put('DONE') for x in range(writer_procs)]
        if queue_message=='WRITE_DONE':
            active_writers = active_writers-1
            if active_writers == 0:
                break

def main():
    reader_procs=1
    compressor_procs=2
    #writer_procs really needs to stay as 1 since writing must be done serially
    #This could probably be written out...
    writer_procs=1
    chunk_size=600
    queue_size=96
    data_queue = Queue(queue_size)
    compressed_queue=Queue(queue_size)
    coordinator_queue=Queue()
    infile_path='/directory/input_records.json.gz'
    infile = gzip.open(infile_path, 'rt')
    outfile_path='/directory/output_records.json.gz'
    outfile=open(outfile_path, 'wb')
    readers = [Process(target=reader, args=(infile, data_queue, coordinator_queue, chunk_size)) for x in range(reader_procs)]
    compressors = [Process(target=compressor, args=(data_queue, compressed_queue, coordinator_queue)) for x in range(compressor_procs)]
    writers = [Process(target=writer, args=(outfile, compressed_queue, coordinator_queue)) for x in range(writer_procs)]
    coordinator_p = Process(target=coordinator, args=(reader_procs, writer_procs, compressor_procs, coordinator_queue, data_queue, compressed_queue))
    coordinator_p.start()
    for process in readers:
        process.start()
    for process in compressors:
        process.start()     
    for process in writers:
        process.start()
    for process in compressors:
        process.join()
    for process in readers:
        process.join()
    for process in writers:
        process.join()
    coordinator_p.join()
    outfile.flush()
    outfile.close()

main()

将每个线程的完整 gzip 流写入单个输出文件实际上非常简单。是的,您将需要一个线程来完成所有的写入工作,每个压缩线程轮流写入其 gzip 流的 all,然后另一个压缩线程开始写入任何内容。压缩线程都可以并行压缩,但是写入需要串行化。

之所以可行,是因为 gzip 标准 RFC 1952 表示 gzip 文件由一系列成员组成,其中每个成员都是一个 gzip header、压缩数据和 gzip预告片。