tf.data: 并行加载步骤
tf.data: Parallelize loading step
我有一个数据输入管道:
- 不可转换为
tf.Tensor
(字典等)类型的输入数据点
- 无法理解张量流类型并需要处理这些数据点的预处理函数;其中一些在运行中进行数据扩充
我一直在尝试将其放入 tf.data
管道中,但我一直坚持 运行 对多个数据点进行并行预处理。到目前为止我试过这个:
- 使用
Dataset.from_generator(gen)
并在生成器中进行预处理;这是可行的,但它会按顺序处理每个数据点,无论我在其上修补的 prefetch
和假 map
调用的排列方式如何。难道不能并行预取吗?
- 将预处理封装在
tf.py_function
中,这样我就可以 map
在我的数据集上并行处理,但是
- 这需要一些非常丑陋的(反)序列化以适应异国情调
输入字符串张量,
- 显然
py_function
的执行将移交给(单进程)python 解释器,所以我会被 python GIL 困住,它不会帮了我大忙
- 我看到你可以用
interleave
做一些技巧,但还没有发现任何没有前两个想法的问题。
我在这里遗漏了什么吗?我是否被迫修改我的预处理,以便它可以在图表中 运行 或者有没有办法对其进行多处理?
我们以前的方法是使用 keras.Sequence,效果很好,但是有太多人推动升级到 tf.data
API。 (见鬼,即使尝试使用 tf 2.2 的 keras.Sequence 也会产生 WARNING:tensorflow:multiprocessing can interact badly with TensorFlow, causing nondeterministic deadlocks. For high performance data pipelines tf.data is recommended.
)
注意:我使用的是 tf 2.2rc3
您可以尝试在输入管道中的 map()
之前添加 batch()
。
通常是为了减少小map函数调用map函数的开销,看这里:
https://www.tensorflow.org/guide/data_performance#vectorizing_mapping
不过,您也可以使用它来获取地图的一批输入 py_function
并在那里使用 python multiprocessing
来加快处理速度。
通过这种方式,您可以绕过 GIL 限制,这使得 tf.data.map()
中的 num_parallel_calls
对于 py_function
映射函数无用。
我遇到了同样的问题并找到了一个(相对)简单的解决方案。
事实证明,这样做的正确方法确实是首先使用 from_generator(gen)
方法创建一个 tf.data.Dataset
对象,然后再应用您的自定义 python 处理函数(包装在py_function
) 与 map
方法。正如您提到的,有一个技巧可以避免输入的序列化/反序列化。
诀窍是使用只生成训练集索引的生成器。每个调用的训练索引都将传递给包装的 py_function,它可以在 return 中评估该索引处的原始数据集。然后,您可以处理您的数据点和 return 处理后的数据到 tf.data
管道的其余部分。
def func(i):
i = i.numpy() # decoding from the EagerTensor object
x, y = processing_function(training_set[i])
return x, y # numpy arrays of types uint8, float32
z = list(range(len(training_set))) # the index generator
dataset = tf.data.Dataset.from_generator(lambda: z, tf.uint8)
dataset = dataset.map(lambda i: tf.py_function(func=func, inp=[i],
Tout=[tf.uint8, tf.float32]),
num_parallel_calls=12)
dataset = dataset.batch(1)
请注意,在实践中,根据您训练数据集的模型,您可能需要在 batch
:
之后对数据集应用另一个 map
def _fixup_shape(x, y):
x.set_shape([None, None, None, nb_channels])
y.set_shape([None, nb_classes])
return x, y
dataset = dataset.map(_fixup_shape)
这是一个已知问题,似乎是由于 from_generator
方法在某些情况下无法正确推断形状。因此,您需要明确传递预期的输出形状。更多信息:
- https://github.com/tensorflow/tensorflow/issues/32912
- as_list() is not defined on an unknown TensorShape on y_t_rank = len(y_t.shape.as_list()) and related to metrics)
我有一个数据输入管道:
- 不可转换为
tf.Tensor
(字典等)类型的输入数据点 - 无法理解张量流类型并需要处理这些数据点的预处理函数;其中一些在运行中进行数据扩充
我一直在尝试将其放入 tf.data
管道中,但我一直坚持 运行 对多个数据点进行并行预处理。到目前为止我试过这个:
- 使用
Dataset.from_generator(gen)
并在生成器中进行预处理;这是可行的,但它会按顺序处理每个数据点,无论我在其上修补的prefetch
和假map
调用的排列方式如何。难道不能并行预取吗? - 将预处理封装在
tf.py_function
中,这样我就可以map
在我的数据集上并行处理,但是- 这需要一些非常丑陋的(反)序列化以适应异国情调 输入字符串张量,
- 显然
py_function
的执行将移交给(单进程)python 解释器,所以我会被 python GIL 困住,它不会帮了我大忙
- 我看到你可以用
interleave
做一些技巧,但还没有发现任何没有前两个想法的问题。
我在这里遗漏了什么吗?我是否被迫修改我的预处理,以便它可以在图表中 运行 或者有没有办法对其进行多处理?
我们以前的方法是使用 keras.Sequence,效果很好,但是有太多人推动升级到 tf.data
API。 (见鬼,即使尝试使用 tf 2.2 的 keras.Sequence 也会产生 WARNING:tensorflow:multiprocessing can interact badly with TensorFlow, causing nondeterministic deadlocks. For high performance data pipelines tf.data is recommended.
)
注意:我使用的是 tf 2.2rc3
您可以尝试在输入管道中的 map()
之前添加 batch()
。
通常是为了减少小map函数调用map函数的开销,看这里: https://www.tensorflow.org/guide/data_performance#vectorizing_mapping
不过,您也可以使用它来获取地图的一批输入 py_function
并在那里使用 python multiprocessing
来加快处理速度。
通过这种方式,您可以绕过 GIL 限制,这使得 tf.data.map()
中的 num_parallel_calls
对于 py_function
映射函数无用。
我遇到了同样的问题并找到了一个(相对)简单的解决方案。
事实证明,这样做的正确方法确实是首先使用 from_generator(gen)
方法创建一个 tf.data.Dataset
对象,然后再应用您的自定义 python 处理函数(包装在py_function
) 与 map
方法。正如您提到的,有一个技巧可以避免输入的序列化/反序列化。
诀窍是使用只生成训练集索引的生成器。每个调用的训练索引都将传递给包装的 py_function,它可以在 return 中评估该索引处的原始数据集。然后,您可以处理您的数据点和 return 处理后的数据到 tf.data
管道的其余部分。
def func(i):
i = i.numpy() # decoding from the EagerTensor object
x, y = processing_function(training_set[i])
return x, y # numpy arrays of types uint8, float32
z = list(range(len(training_set))) # the index generator
dataset = tf.data.Dataset.from_generator(lambda: z, tf.uint8)
dataset = dataset.map(lambda i: tf.py_function(func=func, inp=[i],
Tout=[tf.uint8, tf.float32]),
num_parallel_calls=12)
dataset = dataset.batch(1)
请注意,在实践中,根据您训练数据集的模型,您可能需要在 batch
:
map
def _fixup_shape(x, y):
x.set_shape([None, None, None, nb_channels])
y.set_shape([None, nb_classes])
return x, y
dataset = dataset.map(_fixup_shape)
这是一个已知问题,似乎是由于 from_generator
方法在某些情况下无法正确推断形状。因此,您需要明确传递预期的输出形状。更多信息:
- https://github.com/tensorflow/tensorflow/issues/32912
- as_list() is not defined on an unknown TensorShape on y_t_rank = len(y_t.shape.as_list()) and related to metrics)