初始化分布式工作人员的状态

Initializing state on dask-distributed workers

我正在尝试做类似

的事情
resource = MyResource()
def fn(x):
   something = dosemthing(x, resource)
   return something

client = Client()
results = client.map(fn, data)

问题是 resource 不可序列化且构造成本高。 因此,我想在每个工人身上构建一次,并可供 fn.

使用

我该怎么做? 或者是否有其他方法可以让所有工作人员都可以使用 resource

你总是可以构造一个惰性资源,比如

class GiveAResource():
    resource = [None]
    def get_resource(self):
        if self.resource[0] is None:
            self.resource[0] = MyResource()
        return self.resource[0]

它的一个实例将在进程之间很好地序列化,因此您可以将它作为输入包含在要在 worker 上执行的任何函数中,然后在其上调用 .get_resource() 将获得您本地的昂贵资源(这将在以后出现的任何工人身上重制。

这个 class 最好在模块中定义,而不是动态代码。

这里没有加锁,所以如果多个线程同时请求目前还不需要的资源,你会得到多余的工作。