Python 具有共享数据源和多个 class 实例的多重处理

Python Multiprocessing with shared data source and multiple class instances

我的程序需要生成 class 的多个实例,每个实例都处理来自流数据源的数据。

例如:

parameters = [1, 2, 3]

class FakeStreamingApi:
    def __init__(self):
        pass

    def data(self):
        return 42
    pass

class DoStuff:
    def __init__(self, parameter):
        self.parameter = parameter

    def run(self):
        data = streaming_api.data()
        output = self.parameter ** 2 + data # Some CPU intensive task
        print output

streaming_api = FakeStreamingApi()

# Here's how this would work with no multiprocessing
instance_1 = DoStuff(parameters[0])
instance_1.run()

一旦实例 运行ning 它们就不需要相互交互,它们只需要获取传入的数据。(并打印错误消息等)

我完全不知道如何使它与多处理一起工作,因为我首先必须创建一个 class DoStuff 的新实例,然后拥有它 运行.

这绝对不是办法:

# Let's try multiprocessing
import multiprocessing

for parameter in parameters:
    processes = [ multiprocessing.Process(target = DoStuff, args = (parameter)) ]

# Hmm, this doesn't work...

我们可以尝试定义一个函数来生成 classes,但这看起来很难看:

import multiprocessing

def spawn_classes(parameter):
    instance = DoStuff(parameter)
    instance.run()

for parameter in parameters:
        processes = [ multiprocessing.Process(target = spawn_classes, args = (parameter,)) ]

# Can't tell if it works -- no output on screen?

另外,我不想拥有 API 接口的 3 个不同副本 class 运行ning,我希望在所有进程之间共享该数据。 . 据我所知,多处理为每个新进程创建所有内容的副本。

想法?

编辑: 我想我可能已经明白了...这有什么问题吗?

import multiprocessing

parameters = [1, 2, 3]

class FakeStreamingApi:
    def __init__(self):
        pass

    def data(self):
        return 42
    pass

class Worker(multiprocessing.Process):
    def __init__(self, parameter):
        super(Worker, self).__init__()
        self.parameter = parameter

    def run(self):
        data = streaming_api.data()
        output = self.parameter ** 2 + data # Some CPU intensive task
        print output

streaming_api = FakeStreamingApi()

if __name__ == '__main__':
    jobs = []
    for parameter in parameters:
        p = Worker(parameter)
        jobs.append(p)
        p.start()
    for j in jobs:
        j.join()

我得出的结论是,有必要使用multiprocessing.Queues来解决这个问题。数据源(流 API)需要将数据的副本传递给所有不同的进程,以便它们可以使用它。

还有另一种方法可以使用 multiprocessing.Manager 创建一个共享字典来解决这个问题,但我没有进一步探索它,因为它看起来效率很低并且无法将更改传播到内部值(例如,如果你有列表字典,对内部列表的更改不会传播)。