Python 具有共享数据源和多个 class 实例的多重处理
Python Multiprocessing with shared data source and multiple class instances
我的程序需要生成 class 的多个实例,每个实例都处理来自流数据源的数据。
例如:
parameters = [1, 2, 3]
class FakeStreamingApi:
def __init__(self):
pass
def data(self):
return 42
pass
class DoStuff:
def __init__(self, parameter):
self.parameter = parameter
def run(self):
data = streaming_api.data()
output = self.parameter ** 2 + data # Some CPU intensive task
print output
streaming_api = FakeStreamingApi()
# Here's how this would work with no multiprocessing
instance_1 = DoStuff(parameters[0])
instance_1.run()
一旦实例 运行ning 它们就不需要相互交互,它们只需要获取传入的数据。(并打印错误消息等)
我完全不知道如何使它与多处理一起工作,因为我首先必须创建一个 class DoStuff 的新实例,然后拥有它 运行.
这绝对不是办法:
# Let's try multiprocessing
import multiprocessing
for parameter in parameters:
processes = [ multiprocessing.Process(target = DoStuff, args = (parameter)) ]
# Hmm, this doesn't work...
我们可以尝试定义一个函数来生成 classes,但这看起来很难看:
import multiprocessing
def spawn_classes(parameter):
instance = DoStuff(parameter)
instance.run()
for parameter in parameters:
processes = [ multiprocessing.Process(target = spawn_classes, args = (parameter,)) ]
# Can't tell if it works -- no output on screen?
另外,我不想拥有 API 接口的 3 个不同副本 class 运行ning,我希望在所有进程之间共享该数据。 . 据我所知,多处理为每个新进程创建所有内容的副本。
想法?
编辑:
我想我可能已经明白了...这有什么问题吗?
import multiprocessing
parameters = [1, 2, 3]
class FakeStreamingApi:
def __init__(self):
pass
def data(self):
return 42
pass
class Worker(multiprocessing.Process):
def __init__(self, parameter):
super(Worker, self).__init__()
self.parameter = parameter
def run(self):
data = streaming_api.data()
output = self.parameter ** 2 + data # Some CPU intensive task
print output
streaming_api = FakeStreamingApi()
if __name__ == '__main__':
jobs = []
for parameter in parameters:
p = Worker(parameter)
jobs.append(p)
p.start()
for j in jobs:
j.join()
我得出的结论是,有必要使用multiprocessing.Queues来解决这个问题。数据源(流 API)需要将数据的副本传递给所有不同的进程,以便它们可以使用它。
还有另一种方法可以使用 multiprocessing.Manager 创建一个共享字典来解决这个问题,但我没有进一步探索它,因为它看起来效率很低并且无法将更改传播到内部值(例如,如果你有列表字典,对内部列表的更改不会传播)。
我的程序需要生成 class 的多个实例,每个实例都处理来自流数据源的数据。
例如:
parameters = [1, 2, 3]
class FakeStreamingApi:
def __init__(self):
pass
def data(self):
return 42
pass
class DoStuff:
def __init__(self, parameter):
self.parameter = parameter
def run(self):
data = streaming_api.data()
output = self.parameter ** 2 + data # Some CPU intensive task
print output
streaming_api = FakeStreamingApi()
# Here's how this would work with no multiprocessing
instance_1 = DoStuff(parameters[0])
instance_1.run()
一旦实例 运行ning 它们就不需要相互交互,它们只需要获取传入的数据。(并打印错误消息等)
我完全不知道如何使它与多处理一起工作,因为我首先必须创建一个 class DoStuff 的新实例,然后拥有它 运行.
这绝对不是办法:
# Let's try multiprocessing
import multiprocessing
for parameter in parameters:
processes = [ multiprocessing.Process(target = DoStuff, args = (parameter)) ]
# Hmm, this doesn't work...
我们可以尝试定义一个函数来生成 classes,但这看起来很难看:
import multiprocessing
def spawn_classes(parameter):
instance = DoStuff(parameter)
instance.run()
for parameter in parameters:
processes = [ multiprocessing.Process(target = spawn_classes, args = (parameter,)) ]
# Can't tell if it works -- no output on screen?
另外,我不想拥有 API 接口的 3 个不同副本 class 运行ning,我希望在所有进程之间共享该数据。 . 据我所知,多处理为每个新进程创建所有内容的副本。
想法?
编辑: 我想我可能已经明白了...这有什么问题吗?
import multiprocessing
parameters = [1, 2, 3]
class FakeStreamingApi:
def __init__(self):
pass
def data(self):
return 42
pass
class Worker(multiprocessing.Process):
def __init__(self, parameter):
super(Worker, self).__init__()
self.parameter = parameter
def run(self):
data = streaming_api.data()
output = self.parameter ** 2 + data # Some CPU intensive task
print output
streaming_api = FakeStreamingApi()
if __name__ == '__main__':
jobs = []
for parameter in parameters:
p = Worker(parameter)
jobs.append(p)
p.start()
for j in jobs:
j.join()
我得出的结论是,有必要使用multiprocessing.Queues来解决这个问题。数据源(流 API)需要将数据的副本传递给所有不同的进程,以便它们可以使用它。
还有另一种方法可以使用 multiprocessing.Manager 创建一个共享字典来解决这个问题,但我没有进一步探索它,因为它看起来效率很低并且无法将更改传播到内部值(例如,如果你有列表字典,对内部列表的更改不会传播)。