multiprocessing.Pool 和咕噜声

multiprocessing.Pool and slurm

我写了一个简单的函数来计算文件中的行数。

def line_count(file_name):
    temp = 0
    with open(file_name,'r') as ref:
        ref.readline()
        for line in ref:
            temp+=1
    return temp,file_name

然后我使用 Pool 将其应用于文件夹中的每个文件:

files=[]
for f in glob.glob(data_directory+'/*.txt'):
    files.append(f)
with Pool(np.min([tot_process,len(files)])) as pool:
    rt = pool.map(line_count,files)

其中 tot_process 是我从运行 Python 代码的 slurm 脚本传递的参数。在 slurm 脚本中,我有以下 header:

#!/bin/bash
#SBATCH --cpus-per-task=60
#SBATCH --partition=normal
#SBATCH --ntasks=1
#SBATCH --mem=0
#SBATCH --job-name=test

由于该函数对于 4000 万行文件大约需要 4 秒,我预计 运行 在包含 60 个文件的文件夹上使用 tot_process=60 的代码将花费大约相同的时间。相反,它需要大约 240 秒。我怀疑我在这里遗漏了一些非常基本的东西,我的脚本没有像它应该的那样 运行 多处理。

你 I/O 绑定了。不管你创建了多少个进程,硬盘本身读取文件的速度也只有这么快。它必须寻找一个文件块,以旋转磁盘的速度读取并重复文件中的块数。并且磁盘通过某种总线(可能是 SATA 或 SCSI)连接到计算机,并且它有自己的速度限制(尽管可能比磁盘本身快得多)。 SSD 驱动器比旋转磁盘快,但一般规则仍然适用。

4000 万行表示演出范围内的文件。在 4 秒时,您将获得 250 MB 的数据传输速率,这是相当快的(假设是标准的本地连接硬盘驱动器)。

您可以通过完全跳过多处理并切换到内存映射文件来获得一些加速。这是一个使用 Direct I/O 读取相当大的块的示例。我还没有分析过这个,所以这只是一个猜测。

import os
import mmap

LINE_COUNT_BLOCKSIZE = 2**25   # 32 Meg

def line_count(filename):
    fd = os.open(filename, os.O_RDONLY | os.O_DIRECT)
    try:
        with mmap.mmap(fd, 0, access=mmap.ACCESS_READ) as mm:
            count = 0
            while True:
                buf = mm.read(LINE_COUNT_BLOCKSIZE)
                if not buf:
                    break
                count += buf.count(b"\n")
            return count
    finally:
        os.close(fd)