如何在使用 Pool.map() 进行多处理时解决内存问题?

How to solve memory issues while multiprocessing using Pool.map()?

我已经将程序(下)写到:

一切都很好,该程序在我的小型测试数据集上运行良好。但是,当我输入大数据(大约 14 GB)时,内存消耗呈指数增长,然后冻结计算机或被杀死(在 HPC 集群中)。

我添加了代码以在 data/variable 无用时立即清除内存。我也会在完成后立即关闭游泳池。仍然有 14 GB 输入,我只期望 2*14 GB 内存负担,但似乎很多事情都在发生。我也尝试使用 chunkSize and maxTaskPerChild, etc 进行调整,但我没有看到测试与大文件的优化有任何差异。

我认为当我开始 multiprocessing 时,需要在此代码位置改进此代码 is/are。

p = Pool(3) # number of pool to run at once; default at 1 result = p.map(matrix_to_vcf, list(gen_matrix_df_list.values())) 但是,我发布了整个代码。

测试示例: 我创建了一个最大 250 mb 的测试文件 ("genome_matrix_final-chr1234-1mb.txt") 和 运行 程序。当我检查系统监视器时,我可以看到内存消耗增加了大约 6 GB。我不太清楚为什么 250 mb 文件加上一些输出占用了这么多内存 space。如果它有助于查看真正的问题,我已经通过投递箱共享了该文件。 https://www.dropbox.com/sh/coihujii38t5prd/AABDXv8ACGIYczeMtzKBo0eea?dl=0

有人可以建议我如何解决这个问题吗?

我的 python 脚本:

#!/home/bin/python3

import pandas as pd
import collections
from multiprocessing import Pool
import io
import time
import resource

print()
print('Checking required modules')
print()


''' change this input file name and/or path as need be '''
genome_matrix_file = "genome_matrix_final-chr1n2-2mb.txt"   # test file 01
genome_matrix_file = "genome_matrix_final-chr1234-1mb.txt"  # test file 02
#genome_matrix_file = "genome_matrix_final.txt"    # large file 

def main():
    with open("genome_matrix_header.txt") as header:
        header = header.read().rstrip('\n').split('\t')
        print()

    time01 = time.time()
    print('starting time: ', time01)

    '''load the genome matrix file onto pandas as dataframe.
    This makes is more easy for multiprocessing'''
    gen_matrix_df = pd.read_csv(genome_matrix_file, sep='\t', names=header)

    # now, group the dataframe by chromosome/contig - so it can be multiprocessed
    gen_matrix_df = gen_matrix_df.groupby('CHROM')

    # store the splitted dataframes as list of key, values(pandas dataframe) pairs
    # this list of dataframe will be used while multiprocessing
    gen_matrix_df_list = collections.OrderedDict()
    for chr_, data in gen_matrix_df:
        gen_matrix_df_list[chr_] = data

    # clear memory
    del gen_matrix_df

    '''Now, pipe each dataframe from the list using map.Pool() '''
    p = Pool(3)  # number of pool to run at once; default at 1
    result = p.map(matrix_to_vcf, list(gen_matrix_df_list.values()))

    del gen_matrix_df_list  # clear memory

    p.close()
    p.join()


    # concat the results from pool.map() and write it to a file
    result_merged = pd.concat(result)
    del result  # clear memory

    pd.DataFrame.to_csv(result_merged, "matrix_to_haplotype-chr1n2.txt", sep='\t', header=True, index=False)

    print()
    print('completed all process in "%s" sec. ' % (time.time() - time01))
    print('Global maximum memory usage: %.2f (mb)' % current_mem_usage())
    print()


'''function to convert the dataframe from genome matrix to desired output '''
def matrix_to_vcf(matrix_df):

    print()
    time02 = time.time()

    # index position of the samples in genome matrix file
    sample_idx = [{'10a': 33, '10b': 18}, {'13a': 3, '13b': 19},
                    {'14a': 20, '14b': 4}, {'16a': 5, '16b': 21},
                    {'17a': 6, '17b': 22}, {'23a': 7, '23b': 23},
                    {'24a': 8, '24b': 24}, {'25a': 25, '25b': 9},
                    {'26a': 10, '26b': 26}, {'34a': 11, '34b': 27},
                    {'35a': 12, '35b': 28}, {'37a': 13, '37b': 29},
                    {'38a': 14, '38b': 30}, {'3a': 31, '3b': 15},
                    {'8a': 32, '8b': 17}]

    # sample index stored as ordered dictionary
    sample_idx_ord_list = []
    for ids in sample_idx:
        ids = collections.OrderedDict(sorted(ids.items()))
        sample_idx_ord_list.append(ids)


    # for haplotype file
    header = ['contig', 'pos', 'ref', 'alt']

    # adding some suffixes "PI" to available sample names
    for item in sample_idx_ord_list:
        ks_update = ''
        for ks in item.keys():
            ks_update += ks
        header.append(ks_update+'_PI')
        header.append(ks_update+'_PG_al')


    #final variable store the haplotype data
    # write the header lines first
    haplotype_output = '\t'.join(header) + '\n'


    # to store the value of parsed the line and update the "PI", "PG" value for each sample
    updated_line = ''

    # read the piped in data back to text like file
    matrix_df = pd.DataFrame.to_csv(matrix_df, sep='\t', index=False)

    matrix_df = matrix_df.rstrip('\n').split('\n')
    for line in matrix_df:
        if line.startswith('CHROM'):
            continue

        line_split = line.split('\t')
        chr_ = line_split[0]
        ref = line_split[2]
        alt = list(set(line_split[3:]))

        # remove the alleles "N" missing and "ref" from the alt-alleles
        alt_up = list(filter(lambda x: x!='N' and x!=ref, alt))

        # if no alt alleles are found, just continue
        # - i.e : don't write that line in output file
        if len(alt_up) == 0:
            continue

        #print('\nMining data for chromosome/contig "%s" ' %(chr_ ))
        #so, we have data for CHR, POS, REF, ALT so far
        # now, we mine phased genotype for each sample pair (as "PG_al", and also add "PI" tag)
        sample_data_for_vcf = []
        for ids in sample_idx_ord_list:
            sample_data = []
            for key, val in ids.items():
                sample_value = line_split[val]
                sample_data.append(sample_value)

            # now, update the phased state for each sample
            # also replacing the missing allele i.e "N" and "-" with ref-allele
            sample_data = ('|'.join(sample_data)).replace('N', ref).replace('-', ref)
            sample_data_for_vcf.append(str(chr_))
            sample_data_for_vcf.append(sample_data)

        # add data for all the samples in that line, append it with former columns (chrom, pos ..) ..
        # and .. write it to final haplotype file
        sample_data_for_vcf = '\t'.join(sample_data_for_vcf)
        updated_line = '\t'.join(line_split[0:3]) + '\t' + ','.join(alt_up) + \
            '\t' + sample_data_for_vcf + '\n'
        haplotype_output += updated_line

    del matrix_df  # clear memory
    print('completed haplotype preparation for chromosome/contig "%s" '
          'in "%s" sec. ' %(chr_, time.time()-time02))
    print('\tWorker maximum memory usage: %.2f (mb)' %(current_mem_usage()))

    # return the data back to the pool
    return pd.read_csv(io.StringIO(haplotype_output), sep='\t')


''' to monitor memory '''
def current_mem_usage():
    return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024.


if __name__ == '__main__':
    main()

赏金猎人更新:

我已经使用 Pool.map() 实现了多处理,但代码造成了很大的内存负担(输入测试文件 ~ 300 MB,但内存负担约为 6 GB)。我只期望最大 3*300 MB 的内存负担。

我遇到了同样的问题。我需要处理一个巨大的文本语料库,同时在内存中保留加载数百万行的少数 DataFrame 的知识库。我认为这个问题很常见,所以我会保持我的回答面向一般目的。

组合 设置为我解决了问题(1 & 3 & 5 可能只适合你):

  1. 使用Pool.imap(或imap_unordered)代替Pool.map。这将延迟迭代数据,而不是在开始处理之前将所有数据加载到内存中。

  2. chunksize 参数设置一个值。这也会使 imap 更快。

  3. maxtasksperchild 参数设置一个值。

  4. 将输出追加到磁盘而不是内存。当它达到一定大小时立即或每隔一段时间。

  5. 运行 不同批次的代码。如果你有一个迭代器,你可以使用 itertools.islice。这个想法是将你的 list(gen_matrix_df_list.values()) 分成三个或更多列表,然后你只将前三分之一传递给 mapimap,然后将第二个三分之一传递给另一个 运行,等等. 因为你有一个列表,你可以简单地在同一行代码中将它切片。

当您使用 multiprocessing.Pool 时,将使用 fork() 系统调用创建许多子进程。这些进程中的每一个都以当时父进程内存的精确副本开始。因为您在创建大小为 3 的 Pool 之前加载了 csv,所以池中的这 3 个进程中的每一个都将不必要地拥有数据帧的副本。 (gen_matrix_df 以及 gen_matrix_df_list 将存在于当前进程以及 3 个子进程中的每一个中,因此这些结构中的每一个将在内存中有 4 个副本)

尝试在加载文件之前创建 Pool(实际上是在最开始)这应该会减少内存使用量。

如果还是太高,您可以:

  1. 将gen_matrix_df_list转储到文件中,每行1项,例如:

    import os
    import cPickle
    
    with open('tempfile.txt', 'w') as f:
        for item in gen_matrix_df_list.items():
            cPickle.dump(item, f)
            f.write(os.linesep)
    
  2. 在迭代器上使用 Pool.imap() 遍历您在此文件中转储的行,例如:

    with open('tempfile.txt', 'r') as f:
        p.imap(matrix_to_vcf, (cPickle.loads(line) for line in f))
    

    (请注意,在上面的示例中,matrix_to_vcf 采用 (key, value) 元组,而不仅仅是一个值)

希望对您有所帮助。

注意:我还没有测试上面的代码。这只是为了证明这个想法。

关于多处理内存的一般答案

您问的是:"What is causing so much memory to be allocated"。答案取决于两部分。

首先,正如您已经注意到的,每个multiprocessing worker 都有自己的数据副本(引用from here),所以你应该分块大参数。或者对于大文件,如果可能的话,一次读一点点。

By default the workers of the pool are real Python processes forked using the multiprocessing module of the Python standard library when n_jobs != 1. The arguments passed as input to the Parallel call are serialized and reallocated in the memory of each worker process.

This can be problematic for large arguments as they will be reallocated n_jobs times by the workers.

第二,如果你想回收内存,你需要明白python与其他语言的工作方式不同,你是依靠del to release the memory when it doesn't。我不知道这是否最好,但在我自己的代码中,我已经克服了将变量重新分配给 None 或空对象的问题。

针对您的具体示例 - 最少的代码编辑

只要您可以两次 将大数据放入内存,我认为您只需更改一行就可以完成您想要做的事情。我写了非常相似的代码,当我重新分配变量时它对我有用(副调用 del 或任何类型的垃圾收集)。如果这不起作用,您可能需要按照上述建议并使用 disk I/O:

    #### earlier code all the same
    # clear memory by reassignment (not del or gc)
    gen_matrix_df = {}

    '''Now, pipe each dataframe from the list using map.Pool() '''
    p = Pool(3)  # number of pool to run at once; default at 1
    result = p.map(matrix_to_vcf, list(gen_matrix_df_list.values()))

    #del gen_matrix_df_list  # I suspect you don't even need this, memory will free when the pool is closed

    p.close()
    p.join()
    #### later code all the same

针对您的具体示例 - 最佳内存使用

只要你的大数据可以一次,并且你知道你的文件有多大,你可以使用Pandas read_csv partial file reading, to read in only nrows at a time if you really want to micro-manage how much data is being read in, or a [fixed amount of memory at a time using chunksize], which returns an iterator5。我的意思是,nrows 参数只是一次读取:您可以使用它来查看文件,或者如果出于某种原因您希望每个部分具有完全相同的行数(因为,例如,如果您的任何数据是可变长度的字符串,则每一行将不会占用相同数量的内存)。但我认为,为了为多处理准备一个文件,使用块会容易得多,因为这直接与内存相关,这是您关心的问题。使用试错法根据特定大小的块来适应内存比行数更容易,这将根据行中的数据量改变内存使用量。唯一的另一个困难部分是,出于某些特定于应用程序的原因,您要对一些行进行分组,所以这只会让它变得有点复杂。以您的代码为例:

   '''load the genome matrix file onto pandas as dataframe.
    This makes is more easy for multiprocessing'''

    # store the splitted dataframes as list of key, values(pandas dataframe) pairs
    # this list of dataframe will be used while multiprocessing
    #not sure why you need the ordered dict here, might add memory overhead
    #gen_matrix_df_list = collections.OrderedDict()  
    #a defaultdict won't throw an exception when we try to append to it the first time. if you don't want a default dict for some reason, you have to initialize each entry you care about.
    gen_matrix_df_list = collections.defaultdict(list)   
    chunksize = 10 ** 6

    for chunk in pd.read_csv(genome_matrix_file, sep='\t', names=header, chunksize=chunksize)
        # now, group the dataframe by chromosome/contig - so it can be multiprocessed
        gen_matrix_df = chunk.groupby('CHROM')
        for chr_, data in gen_matrix_df:
            gen_matrix_df_list[chr_].append(data)

    '''Having sorted chunks on read to a list of df, now create single data frames for each chr_'''
    #The dict contains a list of small df objects, so now concatenate them
    #by reassigning to the same dict, the memory footprint is not increasing 
    for chr_ in gen_matrix_df_list.keys():
        gen_matrix_df_list[chr_]=pd.concat(gen_matrix_df_list[chr_])

    '''Now, pipe each dataframe from the list using map.Pool() '''
    p = Pool(3)  # number of pool to run at once; default at 1
    result = p.map(matrix_to_vcf, list(gen_matrix_df_list.values()))
    p.close()
    p.join()

先决条件

  1. 在Python(下面我使用64位版本的Python 3.6.5)一切皆对象。这有它的开销,使用 getsizeof 我们可以准确地看到一个对象的大小(以字节为单位):

    >>> import sys
    >>> sys.getsizeof(42)
    28
    >>> sys.getsizeof('T')
    50
    
  2. 当使用 fork 系统调用(*nix 默认,参见 multiprocessing.get_start_method())创建子进程时,不会复制父进程的物理内存,并使用 copy-on-write 技术。
  3. 分叉子进程仍将报告父进程的完整 RSS(驻留集大小)。由于这个事实,PSS(比例集大小)是更合适的度量来估计分叉应用程序的内存使用情况。这是页面中的示例:
  • Process A has 50 KiB of unshared memory
  • Process B has 300 KiB of unshared memory
  • Both process A and process B have 100 KiB of the same shared memory region

Since the PSS is defined as the sum of the unshared memory of a process and the proportion of memory shared with other processes, the PSS for these two processes are as follows:

  • PSS of process A = 50 KiB + (100 KiB / 2) = 100 KiB
  • PSS of process B = 300 KiB + (100 KiB / 2) = 350 KiB

数据框

不让我们单独看看你的 DataFramememory_profiler 会帮助我们的。

justpd.py

#!/usr/bin/env python3

import pandas as pd
from memory_profiler import profile

@profile
def main():
    with open('genome_matrix_header.txt') as header:
        header = header.read().rstrip('\n').split('\t')

    gen_matrix_df = pd.read_csv(
        'genome_matrix_final-chr1234-1mb.txt', sep='\t', names=header)

    gen_matrix_df.info()
    gen_matrix_df.info(memory_usage='deep')

if __name__ == '__main__':
    main()

现在让我们使用分析器:

mprof run justpd.py
mprof plot

可以看到剧情:

和line-by-line跟踪:

Line #    Mem usage    Increment   Line Contents
================================================
     6     54.3 MiB     54.3 MiB   @profile
     7                             def main():
     8     54.3 MiB      0.0 MiB       with open('genome_matrix_header.txt') as header:
     9     54.3 MiB      0.0 MiB           header = header.read().rstrip('\n').split('\t')
    10                             
    11   2072.0 MiB   2017.7 MiB       gen_matrix_df = pd.read_csv('genome_matrix_final-chr1234-1mb.txt', sep='\t', names=header)
    12                                 
    13   2072.0 MiB      0.0 MiB       gen_matrix_df.info()
    14   2072.0 MiB      0.0 MiB       gen_matrix_df.info(memory_usage='deep')

我们可以看到数据帧在构建时占用了 ~2 GiB,峰值为 ~3 GiB。更有趣的是 info 的输出。

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4000000 entries, 0 to 3999999
Data columns (total 34 columns):
...
dtypes: int64(2), object(32)
memory usage: 1.0+ GB

但是info(memory_usage='deep')("deep"表示通过询问objectdtypes对数据进行深度内省,见下文)给出:

memory usage: 7.9 GB

咦?!从流程外部来看,我们可以确保 memory_profiler 的数字是正确的。 sys.getsizeof 也为框架显示相同的值(很可能是因为自定义 __sizeof__),其他使用它来估计分配的​​工具也是如此 gc.get_objects(),例如pympler.

# added after read_csv
from pympler import tracker
tr = tracker.SummaryTracker()
tr.print_diff()   

给出:

                                             types |   # objects |   total size
================================================== | =========== | ============
                 <class 'pandas.core.series.Series |          34 |      7.93 GB
                                      <class 'list |        7839 |    732.38 KB
                                       <class 'str |        7741 |    550.10 KB
                                       <class 'int |        1810 |     49.66 KB
                                      <class 'dict |          38 |      7.43 KB
  <class 'pandas.core.internals.SingleBlockManager |          34 |      3.98 KB
                             <class 'numpy.ndarray |          34 |      3.19 KB

那么这些 7.93 GiB 从何而来?让我们试着解释一下。我们有 4M 行和 34 列,这给了我们 134M 个值。它们是 int64object(这是一个 64 位指针;有关详细说明,请参阅 using pandas with large data)。因此,对于数据框中的值,我们只有 134 * 10 ** 6 * 8 / 2 ** 20 ~1022 MiB。剩下的 ~ 6.93 GiB 呢?

字符串实习

要理解这种行为,有必要知道 Python 进行字符串驻留。有两篇不错的文章(PyASCIIObjectone, two) about string interning in Python 2. Besides the Unicode change in Python 3 and PEP 393 in Python 3.3 the C-structures have changed, but the idea is the same. Basically, every short string that looks like an identifier will be cached by Python in an internal dictionary and references will point to the same Python objects. In other word we can say it behaves like a singleton. Articles that I mentioned above explain what significant memory profile and performance improvements it gives. We can check if a string is interned using interned字段:

import ctypes

class PyASCIIObject(ctypes.Structure):
     _fields_ = [
         ('ob_refcnt', ctypes.c_size_t),
         ('ob_type', ctypes.py_object),
         ('length', ctypes.c_ssize_t),
         ('hash', ctypes.c_int64),
         ('state', ctypes.c_int32),
         ('wstr', ctypes.c_wchar_p)
    ]

然后:

>>> a = 'name'
>>> b = '!@#$'
>>> a_struct = PyASCIIObject.from_address(id(a))
>>> a_struct.state & 0b11
1
>>> b_struct = PyASCIIObject.from_address(id(b))
>>> b_struct.state & 0b11
0

对于两个字符串,我们也可以进行身份​​比较(在 CPython 的情况下在内存比较中解决)。

>>> a = 'foo'
>>> b = 'foo'
>>> a is b
True
>> gen_matrix_df.REF[0] is gen_matrix_df.REF[6]
True

因此,关于 object dtype,数据框最多分配 20 个字符串(每个氨基酸一个)。不过,值得注意的是 Pandas 推荐 categorical types 用于枚举。

Pandas内存

因此我们可以像这样解释 7.93 GiB 的天真估计:

>>> rows = 4 * 10 ** 6
>>> int_cols = 2
>>> str_cols = 32
>>> int_size = 8
>>> str_size = 58  
>>> ptr_size = 8
>>> (int_cols * int_size + str_cols * (str_size + ptr_size)) * rows / 2 ** 30
7.927417755126953

请注意,str_size 是 58 个字节,而不是我们在上面看到的 1 个字符文字的 50 个字节。这是因为 PEP 393 定义了 compact 和 non-compact 字符串。你可以用 sys.getsizeof(gen_matrix_df.REF[0]).

查看

gen_matrix_df.info() 报告的实际内存消耗应该约为 1 GiB,是两倍。我们可以假设它与 Pandas 或 NumPy 完成的内存(预)分配有关。下面的实验证明不无道理(多次运行显示保存图片):

Line #    Mem usage    Increment   Line Contents
================================================
     8     53.1 MiB     53.1 MiB   @profile
     9                             def main():
    10     53.1 MiB      0.0 MiB       with open("genome_matrix_header.txt") as header:
    11     53.1 MiB      0.0 MiB           header = header.read().rstrip('\n').split('\t')
    12                             
    13   2070.9 MiB   2017.8 MiB       gen_matrix_df = pd.read_csv('genome_matrix_final-chr1234-1mb.txt', sep='\t', names=header)
    14   2071.2 MiB      0.4 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[gen_matrix_df.keys()[0]])
    15   2071.2 MiB      0.0 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[gen_matrix_df.keys()[0]])
    16   2040.7 MiB    -30.5 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    ...
    23   1827.1 MiB    -30.5 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    24   1094.7 MiB   -732.4 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    25   1765.9 MiB    671.3 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    26   1094.7 MiB   -671.3 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    27   1704.8 MiB    610.2 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    28   1094.7 MiB   -610.2 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    29   1643.9 MiB    549.2 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    30   1094.7 MiB   -549.2 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    31   1582.8 MiB    488.1 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    32   1094.7 MiB   -488.1 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])    
    33   1521.9 MiB    427.2 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])    
    34   1094.7 MiB   -427.2 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    35   1460.8 MiB    366.1 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    36   1094.7 MiB   -366.1 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    37   1094.7 MiB      0.0 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    ...
    47   1094.7 MiB      0.0 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])

我想引用 Pandas 的原作者 fresh article about design issues and future Pandas2 的话来结束本节。

pandas rule of thumb: have 5 to 10 times as much RAM as the size of your dataset

进程树

我们终于来到游泳池了,看看能不能利用copy-on-write。我们将使用 smemstat (available form an Ubuntu repository) to estimate process group memory sharing and glances 记下 system-wide 空闲内存。都可以写JSON。

我们将 运行 原始脚本与 Pool(2)。我们需要 3 个终端 windows.

  1. smemstat -l -m -p "python3.6 script.py" -o smemstat.json 1
  2. glances -t 1 --export-json glances.json
  3. mprof run -M script.py

然后 mprof plot 产生:

总和图 (mprof run --nopython --include-children ./script.py) 如下所示:

请注意,上面的两个图表显示的是 RSS。假设是因为 copy-on-write 它并不反映实际的内存使用情况。现在我们有来自 smemstatglances 的两个 JSON 文件。我将使用以下脚本将 JSON 文件转换为 CSV。

#!/usr/bin/env python3

import csv
import sys
import json

def smemstat():
  with open('smemstat.json') as f:
    smem = json.load(f)

  rows = []
  fieldnames = set()    
  for s in smem['smemstat']['periodic-samples']:
    row = {}
    for ps in s['smem-per-process']:
      if 'script.py' in ps['command']:
        for k in ('uss', 'pss', 'rss'):
          row['{}-{}'.format(ps['pid'], k)] = ps[k] // 2 ** 20

    # smemstat produces empty samples, backfill from previous
    if rows:            
      for k, v in rows[-1].items():
        row.setdefault(k, v)

    rows.append(row)
    fieldnames.update(row.keys())

  with open('smemstat.csv', 'w') as out:
    dw = csv.DictWriter(out, fieldnames=sorted(fieldnames))
    dw.writeheader()
    list(map(dw.writerow, rows))

def glances():
  rows = []
  fieldnames = ['available', 'used', 'cached', 'mem_careful', 'percent',
    'free', 'mem_critical', 'inactive', 'shared', 'history_size',
    'mem_warning', 'total', 'active', 'buffers']
  with open('glances.csv', 'w') as out:
    dw = csv.DictWriter(out, fieldnames=fieldnames)
    dw.writeheader()
    with open('glances.json') as f:
      for l in f:
        d = json.loads(l)
        dw.writerow(d['mem'])

if __name__ == '__main__':
  globals()[sys.argv[1]]()

首先让我们看看free内存。

第一个和最小值之间的差异约为 4.15 GiB。这是 PSS 数字的样子:

总和:

因此我们可以看到,由于 copy-on-write,实际内存消耗约为 4.15 GiB。但是我们仍在序列化数据以通过 Pool.map 将其发送到工作进程。我们也可以在这里利用 copy-on-write 吗?

共享数据

要使用 copy-on-write,我们需要让 list(gen_matrix_df_list.values()) 可以全局访问,以便 fork 之后的工作人员仍然可以读取它。

  1. 让我们修改maindel gen_matrix_df之后的代码如下:

    ...
    global global_gen_matrix_df_values
    global_gen_matrix_df_values = list(gen_matrix_df_list.values())
    del gen_matrix_df_list
    
    p = Pool(2)
    result = p.map(matrix_to_vcf, range(len(global_gen_matrix_df_values)))
    ...
    
  2. 删除后面的 del gen_matrix_df_list
  3. 并修改 matrix_to_vcf 的第一行,如:

    def matrix_to_vcf(i):
        matrix_df = global_gen_matrix_df_values[i]
    

现在让我们re-run吧。可用内存:

进程树:

其总和:

因此我们最多有 ~2.9 GiB 的实际备忘录y 用法(峰值主进程在构建数据框架时有)和 copy-on-write 有帮助!

作为旁注,有所谓的 copy-on-read,Python 的引用循环垃圾收集器的行为,described in Instagram Engineering (which led to gc.freeze in issue31558)。但是 gc.disable() 在这种特殊情况下没有影响。

更新

copy-on-write copy-less 数据共享的替代方法是从一开始就使用 numpy.memmap. Here's an example implementation from High Performance Data Processing in Python talk. The 将其委托给内核,然后让 Pandas 使用 mmaped Numpy 数组。