无法发送大于约64MB 通过 `multiprocessing.Pipe()`

Cannot send `torch.tensor`s larger than approx. 64MB through `multiprocessing.Pipe()`

当我尝试通过 multiprocessing.Pipe() 发送 torch.tensor 时,我意识到张量大小有一个尖锐的可发送限制,大约为 。 64MB。如果我仅将此限制再增加一个元素,pipe_connection.send(tensor) 将失败并返回 Process finished with exit code 135 (interrupted by signal 7: SIGEMT),请参阅下面的详细代码。由于 Pipes 是在 Queues、Pools 等中实现的,因此在处理较大的对象时,此限制会导致整个 multiprocessing 模块出现问题。

问题:

我在使用 multiprocessing.Pool().map_async() 时意识到了这一点,它没有失败,只是无一例外地卡在了 Pool()._outqueue.put() 内部调用上。Pipe 再次建立在 multiprocessing.reduction.ForkingPickler 之上,并且socket.socketpair(),但我也找不到任何限制。确切的限制确实稍微取决于后台中的其他 python 内核 运行。我仍然有 64GB 内存,我们正在谈论 64MB 对象...

import multiprocessing as mp
import torch


def generate_ones_tensor(num_dims):
    print('ONES: generating torch.ones() of shape = ({},)'.format(num_dims))
    ones = torch.ones(size=(num_dims,), dtype=torch.int64)
    print('ONES: torch.ones(size=({},)) generated of approx, size = {}MB'.format(num_dims,
        float((8 * num_dims / (1024 ** 2))).__round__(6)))
    return ones

if __name__ == '__main__':
    # Maximum shape of torch tensor that WILL be send through a pipe
    num_max_dims = 838784
    max_ones = generate_ones_tensor(num_dims=num_max_dims)
    # Minimum shape of torch tensor that WON'T be send through a pipe
    num_too_many_dims = 8387585
    over_max_ones = generate_ones_tensor(num_dims=num_too_many_dims)

    # Create pipe with connections
    p1, p2 = mp.Pipe()

    # Sending max. byte size
    p1.send(max_ones)
    print('max_ones was send.')
    getted_max_ones = p2.recv()
    print('max_ones was received.')
    # Sending too many bytes
    p1.send(over_max_ones)
    print('over_max_ones was send.')
    getted_over_max_ones = p2.recv()
    print('over_max_ones was received.')

生成输出:

Connected to pydev debugger (build 191.7479.30)
ONES: generating torch.ones() of shape = (838784,)
ONES: torch.ones(size=(838784,)) generated of approx, size = 6.399414MB
ONES: generating torch.ones() of shape = (8387585,)
ONES: torch.ones(size=(8387585,)) generated of approx, size = 63.992195MB
max_ones was send.
max_ones was received.

Process finished with exit code 135 (interrupted by signal 7: SIGEMT)

编辑附件:

我试图通过将 Pool().starmap_async() 计算过程中的张量分块到 21MB 的大小来规避这个问题。分块张量由 AsyncResult 实例存储在临时文件中。但是它会抛出 3 个张量 RuntimeError:

[DEBUG/ForkPoolWorker-1] starting listener and thread for sending handles
[INFO/ForkPoolWorker-1] created temp directory /tmp/pymp-p9qf5b6r
[DEBUG/ForkPoolWorker-4] CONVERTER: converting datasets['train']['input_ids'] from list to tensor
[DEBUG/ForkPoolWorker-5] CONVERTER: pad dataset['train']['input_ids'] to length = 285
[DEBUG/ForkPoolWorker-2] CONVERTER: generated tensor with tensor.shape = torch.Size([5000, 2, 285]) and size = 21.74MB
[DEBUG/ForkPoolWorker-2] starting listener and thread for sending handles
[INFO/ForkPoolWorker-2] created temp directory /tmp/pymp-ubm3dwnp
[DEBUG/ForkPoolWorker-5] CONVERTER: converting datasets['train']['input_ids'] from list to tensor
[DEBUG/ForkPoolWorker-6] CONVERTER: pad dataset['train']['input_ids'] to length = 285
[DEBUG/ForkPoolWorker-3] CONVERTER: generated tensor with tensor.shape = torch.Size([5000, 2, 285]) and size = 21.74MB
[DEBUG/ForkPoolWorker-6] CONVERTER: converting datasets['train']['input_ids'] from list to tensor
[DEBUG/ForkPoolWorker-7] CONVERTER: pad dataset['train']['input_ids'] to length = 285
[DEBUG/ForkPoolWorker-4] CONVERTER: generated tensor with tensor.shape = torch.Size([5000, 2, 285]) and size = 21.74MB
[DEBUG/ForkPoolWorker-4] Possible encoding error while sending result: Error sending result: '[tensor([[[40478,   547,  1999,  ...,     0,     0,     0],
         [40478,   547,  1999,  ...,     0,     0,     0]],

        [[40478,   547,  1999,  ...,     0,     0,     0],
         [40478,   547,  1999,  ...,     0,     0,     0]],

        [[40478,   547,  1999,  ...,     0,     0,     0],
         [40478,   547,  1999,  ...,     0,     0,     0]],

        ...,

        [[40478,   547,  3898,  ...,     0,     0,     0],
         [40478,   547,  3898,  ...,     0,     0,     0]],

        [[40478,   547,  3898,  ...,     0,     0,     0],
         [40478,   547,  3898,  ...,     0,     0,     0]],

        [[40478,   547,  3898,  ...,     0,     0,     0],
         [40478,   547,  3898,  ...,     0,     0,     0]]])]'. Reason: 'RuntimeError('unable to write to file </torch_329_1813456617>',)'
[DEBUG/ForkPoolWorker-7] CONVERTER: converting datasets['train']['input_ids'] from list to tensor
[DEBUG/ForkPoolWorker-5] CONVERTER: generated tensor with tensor.shape = torch.Size([5000, 2, 285]) and size = 21.74MB
[DEBUG/ForkPoolWorker-5] Possible encoding error while sending result: Error sending result: '[tensor([[[40478,   547,  3898,  ...,     0,     0,     0],
         [40478,   547,  3898,  ...,     0,     0,     0]],

        [[40478,   547,  3898,  ...,     0,     0,     0],
         [40478,   547,  3898,  ...,     0,     0,     0]],

        [[40478,   547,  3898,  ...,     0,     0,     0],
         [40478,   547,  3898,  ...,     0,     0,     0]],

        ...,

解决方法

感谢@Fabrizio,该问题可以确定为特定于系统,并且无法在另一个系统上重现。对于可能仍然遇到相同问题的任何人,一个简单的解决方法是将张量转换为二进制 python 对象,例如pickle.dumps(tensor) 在通过管道发送或酸洗 Pool().map_async() 中目标 func 的 return 之前:

import pickle

def target_func(*args, **kwargs):
    # calculated your results
    result = do_your_stuff(...)
    return pickle.dumps(result)

如果您遇到同样的问题,请告诉我...