Python异步文件下载+解析+输出到JSON

Python asynchronous file download + parsing + outputting to JSON

为了简要说明上下文,我正在下载 SEC 招股说明书数据 for example。下载后我想解析文件以提取某些数据,然后将解析后的字典输出到包含字典列表的 JSON 文件。我会使用 SQL 数据库进行输出,但我大学的研究集群管理员让我访问的速度很慢。如果有人对以后如何轻松存储数据有任何建议 reading/writing,我将不胜感激,我正在考虑将 HDF5 作为一种可能的替代方案。

我正在对我认为需要改进的地方进行标记的最小示例。

def classify_file(doc):
    try:
        data = {
            'link': doc.url
        }
    except AttributeError:
        return {'flag': 'ATTRIBUTE ERROR'}
    # Do a bunch of parsing using regular expressions

if __name__=="__main__":
    items = list()
    for d in tqdm([y + ' ' + q for y in ['2019'] for q in ['1']]):
        stream = os.popen('bash ./getformurls.sh ' + d)
        stacked = stream.read().strip().split('\n')
        # split each line into the fixed-width fields
        widths=(12,62,12,12,44)
        items += [[item[sum(widths[:j]):sum(widths[:j+1])].strip() for j in range(len(widths))] for item in stacked]
    urls = [BASE_URL + item[4] for item in items]

    resp = list()
    # PROBLEM 1
    filelimit = 100
    for i in range(ceil(len(urls)/filelimit)):
        print(f'Downloading: {i*filelimit/len(urls)*100:2.0f}%...   ',end='\r',flush=True)
        resp += [r for r in grequests.map((grequests.get(u) for u in urls[i*filelimit:(i+1)*filelimit]))]

    # PROBLEM 2
    with Pool() as p:
        rs = p.map_async(classify_file,resp,chunksize=20)
        rs.wait()
        prospectus = rs.get()
    with open('prospectus_data.json') as f:
        json.dump(prospectus,f)

引用的 getfileurls.sh 是我编写的 bash 脚本,它比 python 中的脚本更快,因为我可以使用 grep,其代码是

#!/bin/bash
BASE_URL="https://www.sec.gov/Archives/"
INDEX="edgar/full-index/"

url="${BASE_URL}${INDEX}/QTR/form.idx"
out=$(curl -s ${url} | grep "^485[A|B]POS")
echo "$out"

问题 1:所以我目前在 grequests 映射调用中提取了大约 18k 个文件。我 运行 犯了一个关于打开的文件太多的错误,所以我决定将 url 列表拆分为可管理的块。我不喜欢这个解决方案,但它确实有效。

问题 2:这是我的实际错误所在。此代码在我的笔记本电脑上的一组较小的 url (~2k) 上运行良好(使用我的 100% cpu 和 ~20GB RAM ~10GB 用于文件下载,另外 ~10GB 用于解析开始),但是当我在研究集群上使用 40 个内核将它带到更大的 18k 数据集时,它旋转到 ~100GB RAM 和 ~3TB 交换使用,然后在 20 分钟内通过来自服务器的 KeyboardInterrupt 解析大约 2k 文档后崩溃。

我真的不明白为什么交换使用变得如此疯狂,但我想我真的只是需要内存管理方面的帮助。有没有办法创建一个未发送请求的生成器,当我稍后调用 classify_file() 时将发送这些请求?任何帮助将不胜感激。

通常,当您使用 Pool 时内存使用失控,这是因为工作人员正在 re-used 并在每次迭代中积累内存。您偶尔可以关闭 re-open 池以防止这种情况发生,但这是一个非常常见的问题,Python 现在有一个 built-in 参数可以为您完成...

Pool(...maxtasksperchild) 是工作进程在退出并被新的工作进程替换之前可以完成的任务数,以启用未使用的资源释放。默认的 maxtasksperchild 是 None,这意味着工作进程将与池一样长。

我无法告诉您什么是正确的值,但您通常希望将其设置得足够低,以便可以相当频繁地释放资源,但又不会太低以致减慢速度。 (也许需要一分钟的处理时间……只是猜测)

with Pool(maxtasksperchild=5) as p:
    rs = p.map_async(classify_file,resp,chunksize=20)
    rs.wait()
    prospectus = rs.get()

对于您的第一个问题,您可以考虑只使用 requests 并将调用移到您已有的工作进程中。提取 18K 的 URL 并缓存所有这些数据最初将需要时间和内存。如果它全部封装在 worker 中,您将最大限度地减少数据使用,并且您不需要启动那么多打开的文件句柄。