如何更改多处理模块使用的序列化方法?

How to change the serialization method used by the multiprocessing module?

如何更改 Python multiprocessing 库使用的序列化方法?特别是,默认序列化方法使用 pickle 库和 Python 版本的默认 pickle 协议版本。默认的 pickle 协议是 Python 2.7 中的版本 2 和 Python 3.6 中的版本 3。如何在 Python 3.6 中将协议版本设置为 2,以便我可以在 multiprocessing 中使用一些 类(如 ClientListener) Python 2.7 的服务器处理 运行 和 Python 3.6 的客户端进程 运行 之间进行通信的库?

(旁注:作为测试,我通过在 dump() 调用中添加 protocol=2 来修改 line 206 of multiprocessing/connection.py 以强制协议版本为 2 和我的 client/server 进程在服务器 运行 2.7 和客户端 3.6 的有限测试中工作。

在Python 3.6中,合并了一个patch让serializer可以设置,但是这个patch没有文档说明,一直没弄明白怎么用。这是我尝试使用它的方式(我也将其发布到我链接到的 Python 票证上):

pickle2reducer.py:

from multiprocessing.reduction import ForkingPickler, AbstractReducer

class ForkingPickler2(ForkingPickler):
    def __init__(self, *args):
        if len(args) > 1:
            args[1] = 2
        else:
            args.append(2)
        super().__init__(*args)

    @classmethod
    def dumps(cls, obj, protocol=2):
        return ForkingPickler.dumps(obj, protocol)


def dump(obj, file, protocol=2):
    ForkingPickler2(file, protocol).dump(obj)


class Pickle2Reducer(AbstractReducer):
    ForkingPickler = ForkingPickler2
    register = ForkingPickler2.register
    dump = dump

在我的客户中:

import pickle2reducer
multiprocessing.reducer = pickle2reducer.Pickle2Reducer()

在使用 multiprocessing 做任何其他事情之前位于顶部。当我执行此操作时,Python 2.7 仍然在服务器 运行 上看到 ValueError: unsupported pickle protocol: 3

我相信如果您使用的是多处理 "context" object

使用您的 pickle2reducer.py,您的客户应该以:

import pickle2reducer
import multiprocessing as mp

ctx = mp.get_context()
ctx.reducer = pickle2reducer.Pickle2Reducer()

并且 ctxmultiprocessing 具有相同的 API。

希望对您有所帮助!

非常感谢。它准确地引导我找到了我需要的解决方案。我最终做了类似的事情,但修改了连接 class。对我来说,这比自己制作完整的子 class 并替换它更干净。

from multiprocessing.connection import Connection, _ForkingPickler, Client, Listener

def send_py2(self, obj):
    self._check_closed()
    self._check_writable()
    self._send_bytes(_ForkingPickler.dumps(obj, protocol=2))

Connection.send = send_py2

这正是 multiprocessing.connection 中的代码,只是添加了 protocol=2 参数。

我想你甚至可以通过直接编辑 multiprocessing.reduction.

中的原始 ForkingPickler class 来做同样的事情