tf.data: 并行加载步骤

tf.data: Parallelize loading step

我有一个数据输入管道:

我一直在尝试将其放入 tf.data 管道中,但我一直坚持 运行 对多个数据点进行并行预处理。到目前为止我试过这个:

我在这里遗漏了什么吗?我是否被迫修改我的预处理,以便它可以在图表中 运行 或者有没有办法对其进行多处理?

我们以前的方法是使用 keras.Sequence,效果很好,但是有太多人推动升级到 tf.data API。 (见鬼,即使尝试使用 tf 2.2 的 keras.Sequence 也会产生 WARNING:tensorflow:multiprocessing can interact badly with TensorFlow, causing nondeterministic deadlocks. For high performance data pipelines tf.data is recommended.

注意:我使用的是 tf 2.2rc3

您可以尝试在输入管道中的 map() 之前添加 batch()

通常是为了减少小map函数调用map函数的开销,看这里: https://www.tensorflow.org/guide/data_performance#vectorizing_mapping

不过,您也可以使用它来获取地图的一批输入 py_function 并在那里使用 python multiprocessing 来加快处理速度。

通过这种方式,您可以绕过 GIL 限制,这使得 tf.data.map() 中的 num_parallel_calls 对于 py_function 映射函数无用。

我遇到了同样的问题并找到了一个(相对)简单的解决方案。

事实证明,这样做的正确方法确实是首先使用 from_generator(gen) 方法创建一个 tf.data.Dataset 对象,然后再应用您的自定义 python 处理函数(包装在py_function) 与 map 方法。正如您提到的,有一个技巧可以避免输入的序列化/反序列化。

诀窍是使用只生成训练集索引的生成器。每个调用的训练索引都将传递给包装的 py_function,它可以在 return 中评估该索引处的原始数据集。然后,您可以处理您的数据点和 return 处理后的数据到 tf.data 管道的其余部分。

def func(i):
    i = i.numpy() # decoding from the EagerTensor object
    x, y = processing_function(training_set[i])
    return x, y # numpy arrays of types uint8, float32

z = list(range(len(training_set))) # the index generator

dataset = tf.data.Dataset.from_generator(lambda: z, tf.uint8)

dataset = dataset.map(lambda i: tf.py_function(func=func, inp=[i], 
                                               Tout=[tf.uint8, tf.float32]), 
                      num_parallel_calls=12)

dataset = dataset.batch(1)

请注意,在实践中,根据您训练数据集的模型,您可能需要在 batch:

之后对数据集应用另一个 map
def _fixup_shape(x, y):
    x.set_shape([None, None, None, nb_channels])
    y.set_shape([None, nb_classes])
    return x, y
dataset = dataset.map(_fixup_shape)

这是一个已知问题,似乎是由于 from_generator 方法在某些情况下无法正确推断形状。因此,您需要明确传递预期的输出形状。更多信息: