将 BigQuery 的导出并行化到 Python 中的本地 JSON 文件

Parallelize the export from BigQuery to local JSON file in Python

我们在 BigQuery 中有一个 table,我们需要将其导出到本地换行符分隔的 JSON 文件中。使用 BigQuery 的导出到 GCS 功能是有问题的,因为它将整数类型转换为字符串,请参阅 and 并且我自己尝试过并且整数丢失了。我们提出了以下解决方案,它保留了整数类型,但是 非常慢:

当前工作代码

bq = bigquery.Client()
our_query = "select * from our_project.our_dataset.our_bq_table"

results_row_iter = bq.query(our_query) # google.bigquery.rowIterator
counter = 0
with open('/tmp/output_file.json', 'w') as f:
    for row in results_row_iter:
        f.write(json.dumps(dict(row), default=str) + '\n') # dumps as ndjson

our_bq_table 在 BigQuery 中是 5GB,有 340 万行和约 100 个字段,上面的 for 循环在我们的 table 上花费了 90 分钟。 our_bq_table 在整数列 confId 上分区,table 中有约 100 个唯一的 confId,值为 1 - 100。我们想利用分区键 + 并行化来加速完成这个过程...不知何故。

我们想要的伪代码

bq = bigquery.Client()
base_query = "select * from our_project.our_dataset.our_bq_table"

all_conf_ids = range(1, 100)

def dump_conf_id(base_query, id):
    iter_query = f"{base_query} where confId = {id}"
    results_row_iter = bq.query(iter_query)
    counter = 0
    with open(f'output_file-{id}.json', 'w') as f:
        for row in results_row_iter:
            f.write(json.dumps(dict(row), default=str) + '\n') # dumps as ndjson

in parallel:
    for id in all_conf_ids:
        dump_conf_id(id)

# last step, perhaps concat the separate files into 1 somehow, assuming there are multiple output files...

此方法利用 confId 字段,使我们的 BigQuery 查询保持较小。我不太确定如何在伪代码之外实现它,并且在 python 中弄清楚多线程与多处理与其他并行化方式让我不知所措。我们的最终输出需要是单个输出文件,伪代码转储到单独的文件中,但如果我们可以并行转储到单个文件中,那也很棒。

编辑:我们在实施解决方案之前试图解决的一个关键问题是我们应该为此使用多处理还是多线程,鉴于 this 正在并行转储到本地 .json...

两条经验法则:

  • 在 Python 中,您可以在程序主要受 IO 限制的情况下使用多线程,但如果它受 CPU 限制,则必须使用多处理。这是因为 Python 的 Global Interpreter Lock.
  • 你不应该在同一个程序中混合使用多线程和多处理。

在这种情况下,我猜你的问题(每个结果导出一个 JSON)是 CPU-bound,所以我建议多处理。这是一个代码示例:

from multiprocessing import Pool

n_jobs = 4
with Pool(n_jobs) as p:
    # call dump_conf_id on each member of all_conf_ids
    print(p.map(dump_conf_id, all_conf_ids))

[...] but if we can dump into a single file in parallel that would be great too.

我不会费心尝试将它们写入单个文件。读取文件并将它们连接起来可能是您正在做的最快的部分。快速基准测试显示它运行在大约 780 MB/s:

import subprocess

with open("output.json", "wb") as f:
    filenames = [f'output_file-{id}.json' for i in all_conf_ids]
    subprocess.check_call(["cat", *filesnames], stdout=f)