子类化 multiprocessing.queue.Queue:parent 设置的属性对 child 不可用?

Subclassing multiprocessing.queue.Queue: attributes set by parent not available to child?

我在 python 3.7 中使用多处理库,其中 parent 进程通过队列与 child 进程通信。我可以很好地使用 multiprocessing.queue.Queue,但是当我子类化以向队列添加一些额外的功能时,它们在 parent 中设置得很好,但在 [=18= 访问时似乎丢失了].知道如何让属性在 child 中持久存在吗?

import sys
import time
import multiprocessing
import multiprocessing.queues

# Subclass multiprocessing.queue.Queue to add some useful features
class q_class(multiprocessing.queues.Queue):
    def __init__(self):
        self.report_bottlenecks = False
        self.bottleneck_time = 1.0 #in seconds
        super(q_class, self).__init__(ctx=multiprocessing.get_context())
    def put(self,header,payload=None):
        message = {}
        message['header'] = header
        message['payload'] = payload
        message['put_time'] = time.time()
        super(q_class, self).put(message)
    def get(self):
        message = super(q_class, self).get()
        message['get_time'] = time.time()
        message['queue_time'] = message['get_time'] - message['put_time']
        if self.report_bottlenecks:
            if message['queue_time']>self.bottleneck_time:
                self.debug.print('Queue bottleneck: '+str(int(message['queue_time']))+' seconds.')
        return(message)

def function_for_child_to_run(the_q):
    print('Child process started')
    print(the_q.report_bottlenecks) #currently causes an AttributeError: 'q_class' object has no attribute 'report_bottlenecks'

if __name__ == '__main__':
    multiprocessing.set_start_method('forkserver')
    my_q = q_class()
    print(my_q.report_bottlenecks) #should print "False"
    process = multiprocessing.Process( target=function_for_child_to_run , args=(my_q,) )
    process.start()
    time.sleep(2) #give the child time to start & print
    process.join()
    sys.exit()

要与 forkserver 一起使用,您的队列必须经过 pickle。 multiprocessing.queues.Queue 定义了 __getstate____setstate__ 方法来 pickling 和 unpickling 实例属性。当您将属性添加到子类队列时,您必须使用自定义属性扩展这两个方法,否则它们将丢失。

将此添加到您的代码中:

from multiprocessing import context
...
class QClass(multiprocessing.queues.Queue):     
    ...
    def __getstate__(self):
        context.assert_spawning(self)
        return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
                self._rlock, self._wlock, self._sem, self._opid,
                self.report_bottlenecks, self.bottleneck_time)  # <---

    def __setstate__(self, state):
        (self._ignore_epipe, self._maxsize, self._reader, self._writer,
         self._rlock, self._wlock, self._sem, self._opid,
         self.report_bottlenecks, self.bottleneck_time) = state  # <---
        self._after_fork()