分块使用加速器数据集
Working with an accelerator dataset in chunks
我们在 accelerator 中有一个庞大的数据集,我们需要在其中对每一行执行一些非常昂贵的操作。
如果我们想一次完成整个集合,那将需要数周时间,所以我们希望一次只取一小部分,比如 1%,然后在晚上进行。
目前,所有数据都在一个数据集中。设置工作的选项有哪些?
第 100 万到 200 万行?
我可以看到三个可能的路径:
保留数据集
- 添加一个新列,填入None
- 添加一种跳过前 n+ 百万行然后为下百万更新该列的方法
我的方法问题是它需要跳过 n^2 / 2 行,而我们有很多行
创建链式数据集。
遍历主数据集并将每一行添加到新数据集,每百万分之一行创建新数据集并与旧数据集链接
这个应该更快,它需要更多 space 但它仍然有用。
使用魔术方法将数据集拆分成链
就用标准库中的方法把数据集拆分成链,这个就不错了!
理想情况下,您将从一开始就将数据作为具有合理块的链导入,但如果由于某种原因这不切实际,我认为拆分您拥有的数据集是可行的方法。 (正如你所说,如果 n 不小,n^2 就不好。)
可能应该添加执行此操作的标准方法。这种方法可能如下所示:
options = dict(lines_per_dataset=int)
datasets = ('source', 'previous',)
def prepare(job, slices):
previous = datasets.previous
writers = []
for sliceno, linecnt in enumerate(datasets.source.lines):
writers.append([])
for ix in range(0, linecnt, options.lines_per_dataset):
previous = job.datasetwriter(
columns=datasets.source.columns,
previous=previous,
name='%d_%d' % (sliceno, ix,),
for_single_slice=sliceno,
)
writers[-1].append(previous)
return writers
def analysis(sliceno, prepare_res):
per = options.lines_per_dataset
writers = iter(prepare_res[sliceno])
for ix, data in enumerate(datasets.source.iterate(sliceno)):
if ix % per == 0:
write = next(writers).get_split_write_list()
write(data)
def synthesis(prepare_res):
# We want the last one to be the default dataset
for r in prepare_res:
for dw in r:
ds = dw.finish()
ds.link_to_here()
请注意,我不会在此处传播散列标签,因为如果源是散列分区的,那么每个数据集的数据只会留在一个切片中。
我们在 accelerator 中有一个庞大的数据集,我们需要在其中对每一行执行一些非常昂贵的操作。 如果我们想一次完成整个集合,那将需要数周时间,所以我们希望一次只取一小部分,比如 1%,然后在晚上进行。
目前,所有数据都在一个数据集中。设置工作的选项有哪些? 第 100 万到 200 万行?
我可以看到三个可能的路径:
保留数据集
- 添加一个新列,填入None
- 添加一种跳过前 n+ 百万行然后为下百万更新该列的方法
我的方法问题是它需要跳过 n^2 / 2 行,而我们有很多行
创建链式数据集。
遍历主数据集并将每一行添加到新数据集,每百万分之一行创建新数据集并与旧数据集链接
这个应该更快,它需要更多 space 但它仍然有用。
使用魔术方法将数据集拆分成链
就用标准库中的方法把数据集拆分成链,这个就不错了!
理想情况下,您将从一开始就将数据作为具有合理块的链导入,但如果由于某种原因这不切实际,我认为拆分您拥有的数据集是可行的方法。 (正如你所说,如果 n 不小,n^2 就不好。)
可能应该添加执行此操作的标准方法。这种方法可能如下所示:
options = dict(lines_per_dataset=int)
datasets = ('source', 'previous',)
def prepare(job, slices):
previous = datasets.previous
writers = []
for sliceno, linecnt in enumerate(datasets.source.lines):
writers.append([])
for ix in range(0, linecnt, options.lines_per_dataset):
previous = job.datasetwriter(
columns=datasets.source.columns,
previous=previous,
name='%d_%d' % (sliceno, ix,),
for_single_slice=sliceno,
)
writers[-1].append(previous)
return writers
def analysis(sliceno, prepare_res):
per = options.lines_per_dataset
writers = iter(prepare_res[sliceno])
for ix, data in enumerate(datasets.source.iterate(sliceno)):
if ix % per == 0:
write = next(writers).get_split_write_list()
write(data)
def synthesis(prepare_res):
# We want the last one to be the default dataset
for r in prepare_res:
for dw in r:
ds = dw.finish()
ds.link_to_here()
请注意,我不会在此处传播散列标签,因为如果源是散列分区的,那么每个数据集的数据只会留在一个切片中。