在 Python 中通过套接字在两个进程之间传递共享内存对象

Passing shared memory object between two processes via socket in Python

我的应用程序包含两个通过 TCP 套接字连接的 Python 进程。第一个进程旨在读取文件并将它们放在共享内存中而不是下推套接字,以便其他进程可以读取它们。

import os
import zmq
import mmap
import time
import ctypes
import pickle
import dill
import tempfile
import multiprocessing as mp

MEETING_POINT = 'tcp://127.0.0.1:12345'
FILEFD, FILEPATH = tempfile.mkstemp()


def run_server():
    Server().run()


def read_file():
    return open(FILEPATH, 'r').read()


class Client(object):
    def __init__(self):
        self.ctx = zmq.Context()
        self.socket = self.ctx.socket(zmq.DEALER)
        self.socket.connect(MEETING_POINT)

    def cleanup(self):
        self.socket.close()
        self.ctx.term()

    def run(self):
        self.socket.send(dill.dumps(read_file()))


class Server(object):
    def __init__(self):
        self.ctx = zmq.Context()
        self.socket = self.ctx.socket(zmq.DEALER)
        self.socket.bind(MEETING_POINT)

    def cleanup(self):
        self.socket.close()
        self.ctx.term()

    def run(self):
        f = dill.loads(self.socket.recv())
        print(f)


def main():
    with open(FILEPATH, 'w') as fd:
        fd.write('blah')

    mp.Process(target=run_server).start()
    time.sleep(.5)  # a (poor, but tolerable) alternative to handshaking
    Client().run()


if __name__ == '__main__':
    main()

我的问题归结为:如何通过套接字传递有关内存段的信息,第二个进程将访问该内存段?我尝试了以下方法(pickledill),但都无济于事:

结果:pickle.PicklingError: Can't pickle <class 'multiprocessing.sharedctypes.c_byte_Array_4'>: it's not found as multiprocessing.sharedctypes.c_byte_Array_4 在客户端酸洗时。

我相信有一种更简单(且有效)的方法可以做到这一点,是吗?我无法预读所有必要的文件并将它们打包到 multiprocessing.Array 中以在创建服务器进程时作为 args 传递,当然这会(据说)解决所有问题。

回答我自己的问题,因为我想 确切地 我要做什么。根据 man mmap,为了让第二个进程能够读取第一个进程的内存,创建一个带有 MAP_SHARED 标志的内存段就足够了。因此,在 mmap.mmap 的构造函数中默认设置了这个标志,所有需要做的就是传递文件名而不是 class 实例:

# sometime during the first process' execution
fd = open(filepath_to_process, 'w')
memory = mmap.mmap(fd.fileno(), 0)
socket.send(filepath_to_process)

# on the other side of socket, this happens:
unprocessed_filepath = socket.recv()
fresh_fd = open(unprocessed_filepath, 'r')
mm = mmap.mmap(fresh_fd.fileno(), 0)

# voilà! All the changes are now propagated to the second process

当然,文件和内存区域的访问模式必须匹配,但是 man mmap 中有一个单独的部分,称为 "ERRORS".

至于共享内存,有posix_ipc and sysv_ipc个模块(后者支持更广泛的操作系统)。