火炬数据管道
Pytorch data pipeline
我正在尝试实现一个类似解决方案的有界缓冲区,其中数据生成器和模型作为两个独立的进程工作。数据生成器预处理数据并存储在共享队列中(使用预定义的最大大小来限制内存使用)。另一方面,该模型以自己的节奏使用该队列中的数据,直到队列为空。下面是我的实现片段。
'''
self._buffer is an object of multiprocessing.Queue
'''
def produce(self):
for obj in self._generator:
self._buffer.put(obj=obj, block=True, timeout=None)
self._buffer.put(obj=None)
def consume(self):
while True:
dat = self._buffer.get(block=True, timeout=None)
if dat is None:
break
else:
# Train model on `dat`
def run(self):
pt = multiprocessing.Process(target=self.produce)
ct = multiprocessing.Process(target=self.consume)
pt.start()
ct.start()
pt.join()
ct.join()
但是,上述解决方案不起作用。我按照文档中的说明使用了 torch.multiprocessing
。我还设置了 torch.multiprocessing.set_start_method('spawn')
以避免“RuntimeError: Cannot re-initialize CUDA in forked subprocess. To use CUDA with multiprocessing, you must use the 'spawn' start method
”
但现在我得到“TypeError: cannot pickle 'generator' object
”。如何解决?
由于您使用的是 pytorch,因此您应该使用 Dataset and Dataloader 方法。这会为您处理多处理、共享内存等所有问题。
你可以map style datasets or things like iterable-style。最好阅读官方文档,了解它们是什么以及它们是如何工作的。
在您的情况下,您可能对可迭代样式的数据集没问题。我对类似的情况使用了这两种方法。您可以拥有可迭代样式数据集,如果您不知道要处理多少样本,则可能需要它。对于其他情况,我有一个地图样式的数据集,我事先知道我的样本总数(例如处理目录中的所有图像)并且可以使用顺序采样器按顺序给我元素。
关于你的一个问题。当您有无法序列化的对象时,会发生所有类似 TypeError: cannot pickle 'generator' object
的错误。对于序列化,使用 pickle
。在您的情况下 self._generator 似乎是一个由于某种原因无法序列化的对象。没有代码就不可能说出原因。我遇到过使用 pybind 创建的包装的 c++ 包的情况,其中对象不可序列化,或者我在某处有一些 mutex
变量。
我正在尝试实现一个类似解决方案的有界缓冲区,其中数据生成器和模型作为两个独立的进程工作。数据生成器预处理数据并存储在共享队列中(使用预定义的最大大小来限制内存使用)。另一方面,该模型以自己的节奏使用该队列中的数据,直到队列为空。下面是我的实现片段。
'''
self._buffer is an object of multiprocessing.Queue
'''
def produce(self):
for obj in self._generator:
self._buffer.put(obj=obj, block=True, timeout=None)
self._buffer.put(obj=None)
def consume(self):
while True:
dat = self._buffer.get(block=True, timeout=None)
if dat is None:
break
else:
# Train model on `dat`
def run(self):
pt = multiprocessing.Process(target=self.produce)
ct = multiprocessing.Process(target=self.consume)
pt.start()
ct.start()
pt.join()
ct.join()
但是,上述解决方案不起作用。我按照文档中的说明使用了 torch.multiprocessing
。我还设置了 torch.multiprocessing.set_start_method('spawn')
以避免“RuntimeError: Cannot re-initialize CUDA in forked subprocess. To use CUDA with multiprocessing, you must use the 'spawn' start method
”
但现在我得到“TypeError: cannot pickle 'generator' object
”。如何解决?
由于您使用的是 pytorch,因此您应该使用 Dataset and Dataloader 方法。这会为您处理多处理、共享内存等所有问题。
你可以map style datasets or things like iterable-style。最好阅读官方文档,了解它们是什么以及它们是如何工作的。
在您的情况下,您可能对可迭代样式的数据集没问题。我对类似的情况使用了这两种方法。您可以拥有可迭代样式数据集,如果您不知道要处理多少样本,则可能需要它。对于其他情况,我有一个地图样式的数据集,我事先知道我的样本总数(例如处理目录中的所有图像)并且可以使用顺序采样器按顺序给我元素。
关于你的一个问题。当您有无法序列化的对象时,会发生所有类似 TypeError: cannot pickle 'generator' object
的错误。对于序列化,使用 pickle
。在您的情况下 self._generator 似乎是一个由于某种原因无法序列化的对象。没有代码就不可能说出原因。我遇到过使用 pybind 创建的包装的 c++ 包的情况,其中对象不可序列化,或者我在某处有一些 mutex
变量。