分块使用加速器数据集

Working with an accelerator dataset in chunks

我们在 accelerator 中有一个庞大的数据集,我们需要在其中对每一行执行一些非常昂贵的操作。 如果我们想一次完成整个集合,那将需要数周时间,所以我们希望一次只取一小部分,比如 1%,然后在晚上进行。

目前,所有数据都在一个数据集中。设置工作的选项有哪些? 第 100 万到 200 万行?

我可以看到三个可能的路径:

保留数据集

  1. 添加一个新列,填入None
  2. 添加一种跳过前 n+ 百万行然后为下百万更新该列的方法

我的方法问题是它需要跳过 n^2 / 2 行,而我们有很多行

创建链式数据集。

遍历主数据集并将每一行添加到新数据集,每百万分之一行创建新数据集并与旧数据集链接

这个应该更快,它需要更多 space 但它仍然有用。

使用魔术方法将数据集拆分成链

就用标准库中的方法把数据集拆分成链,这个就不错了!

理想情况下,您将从一开始就将数据作为具有合理块的链导入,但如果由于某种原因这不切实际,我认为拆分您拥有的数据集是可行的方法。 (正如你所说,如果 n 不小,n^2 就不好。)

可能应该添加执行此操作的标准方法。这种方法可能如下所示:

options = dict(lines_per_dataset=int)
datasets = ('source', 'previous',)

def prepare(job, slices):
    previous = datasets.previous
    writers = []
    for sliceno, linecnt in enumerate(datasets.source.lines):
        writers.append([])
        for ix in range(0, linecnt, options.lines_per_dataset):
            previous = job.datasetwriter(
                columns=datasets.source.columns,
                previous=previous,
                name='%d_%d' % (sliceno, ix,),
                for_single_slice=sliceno,
            )
            writers[-1].append(previous)
    return writers

def analysis(sliceno, prepare_res):
    per = options.lines_per_dataset
    writers = iter(prepare_res[sliceno])
    for ix, data in enumerate(datasets.source.iterate(sliceno)):
        if ix % per == 0:
            write = next(writers).get_split_write_list()
        write(data)

def synthesis(prepare_res):
    # We want the last one to be the default dataset
    for r in prepare_res:
        for dw in r:
            ds = dw.finish()
    ds.link_to_here()

请注意,我不会在此处传播散列标签,因为如果源是散列分区的,那么每个数据集的数据只会留在一个切片中。