Python 多处理 Pool.map 在 __new__ 中导致错误

Python Multiprocessing Pool.map Causes Error in __new__

在下面的简单 Python 3 示例中,我们使用 multiproessing 模块处理列表 friends,导致错误的原因:

TypeError: new() missing 1 required positional argument: 'name'

只要运行

就不会出错
tom = Friend(tom)
say_hello(tom)

有什么办法可以解决这个问题吗?谢谢!

代码

import multiprocessing

def say_hello(friend):
    print('Hello', friend.name, '!')

class Friend:
    friends = {}
    def __new__(cls, name):
        if name not in cls.friends:
            cls.friends[name] = super(Friend, cls).__new__(cls)
        return cls.friends[name]

    def __init__(self, name):
        self.name = name

jack = Friend('jack')
ryan = Friend('ryan')
friends = [jack, ryan]
multiprocessing.Pool(2).map(say_hello, friends)

完整错误跟踪

Traceback (most recent call last):
  File "/Users/nyxynyx/opt/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/Users/nyxynyx/opt/anaconda3/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/nyxynyx/opt/anaconda3/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/Users/nyxynyx/opt/anaconda3/lib/python3.7/multiprocessing/queues.py", line 354, in get
    return _ForkingPickler.loads(res)
TypeError: __new__() missing 1 required positional argument: 'name'

unpickling 期间出错,因为 name 在 unpickling 期间重新创建对象时未准备好传递。

它已经可以被复制:

pickle.loads(pickle.dumps(jack))

Traceback (most recent call last): 
  ...
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-9-239857af5731>", line 1, in <module>
    pickle.loads(pickle.dumps(jack))
TypeError: __new__() missing 1 required positional argument: 'name'

解决方案是实施object.__getnewargs__() or object.__getnewargs_ex__()

object.getnewargs()

This method serves a similar purpose as getnewargs_ex(), but supports only positional arguments. It must return a tuple of arguments args which will be passed to the new() method upon unpickling.

getnewargs() will not be called if getnewargs_ex() is defined.

Changed in version 3.6: Before Python 3.6, getnewargs() was called instead of getnewargs_ex() in protocols 2 and 3.

所以在你的情况下:

def __getnewargs__(self):
    return self.name,

定义 __reduce__ 使 Friend class 的对象可腌制(可序列化)以发送到其他进程。

import multiprocessing

def say_hello(friend):
    print('Hello', friend.name, '!')

class Friend:
    friends = {}
    def __new__(cls, name):
        if name in cls.friends:
            return cls.friends[name]
        else:
            return super(Friend, cls).__new__(cls)

    def __init__(self, name):
        self.name = name

    def __reduce__(self):
        return self.__class__, (self.name,)

jack = Friend('jack')
ryan = Friend('ryan')
friends = [jack, ryan]
multiprocessing.Pool(2).map(say_hello, friends)