在 dask Client.map() 调用期间会发生什么?

What happens during dask Client.map() call?

我正在尝试使用 dask 编写网格搜索实用程序。 objective 函数调用了一个 class 的方法,其中包含一个大数据名。我正在尝试使用 dask 将计算并行化为多核解决方案,而无需复制原始 class/dataframe。我没有在文档中找到任何解决方案,所以我在这里发布了一个玩具示例:

import pickle
from dask.distributed import Client, LocalCluster
from multiprocessing import current_process


class TestClass:
    def __init__(self):
        self.param = 0

    def __getstate__(self):
        print("I am pickled!")
        return self.__dict__

    def loss(self, ext_param):
        self.param += 1
        print(f"{current_process().pid}: {hex(id(self))}:  {self.param}: {ext_param} ")
        return f"{self.param}_{ext_param}"


def objective_function(param):
    return test_instance.loss(param)

if __name__ == '__main__':

    test_instance = TestClass()
    print(hex(id(test_instance)))
    cluster = LocalCluster(n_workers=2)
    client = Client(cluster)
    futures = client.map(objective_function, range(20))
    result = client.gather(futures)
    print(result)
    
# ---- OUTPUT RESULTS ----
# 0x7fe0a5056d30
# I am pickled!
# I am pickled!
# 11347: 0x7fb9bcfa0588:  1: 0
# 11348: 0x7fb9bd0a2588:  1: 1
# 11347: 0x7fb9bcf94240:  1: 2
# 11348: 0x7fb9bd07b6a0:  1: 3
# 11347: 0x7fb9bcf945f8:  1: 4 
# ['1_0', '1_1', '1_2', '1_3', '1_4']

我有以下问题:

  1. 为什么下面的 pickle 函数被调用了两次?
  2. 我注意到 map 函数的每次迭代都使用 test_instance 的新副本,正如您从每次迭代的不同 class 地址以及从事实上,test_instance.param 属性在每次迭代时都设置为 0(此行为不同于 multiprocessing.Pool 的标准实现,我已突出显示 here)。我假设在每次迭代期间,每个进程都会收到一份腌制的新副本 class - 对吗?
  3. 根据(2),计算时内存中有多少份test_instance?它是 1(对于主线程中的原始实例)+ 1(腌制副本)+ 2(每个进程中存在的实例)= 4 吗?有什么办法可以让这个值变成1吗?

我注意到一些共享内存解决方案可以通过使用 this github issue 中提出的 Ray 库获得。

Why is the following pickle function called twice?

通常,python 的 pickle 有效地将实例变量和对导入模块中的 class 的引用捆绑在一起。在 __main__ 中,这可能是不可靠的,dask 回退到 cloudpickle(内部也调用 pickle)。在我看来,检查 "__main__" in distributed.protocol.pickle.dumps 可能发生在第一次尝试 pickle 之前。

during each iteration each process will receive a fresh copy of the pickled class

是的。每次 dask 运行s 一个任务,它反序列化输入,创建实例的 nw 副本。请注意,您的 dask worker 可能是通过 fork_server 技术创建的,因此不会简单地复制内存(这是做事的安全方法)。

你可以在计算之前将实例“分散”给工作人员,他们可以重用他们的本地副本,但是 dask 任务不应该通过改变对象来工作,而是通过返回结果(即功能上)。

how many copies of test_instance are in memory

1 个在客户端,加上每个正在执行的任务一个。序列化的版本也可能在附近,可能是一个保存在图中,它暂时在客户端,然后在调度程序上保存;在反序列化时,它也会暂时存在于工作内存中。对于某些类型,zero-copy de/ser 是可能的。

如果任务因为对象的大小而非常大,你一定要事先“分散”它们(client.scatter)。

Is there any way to get this value to 1?

您可以 运行 调度程序 and/or worker in-process 共享内存,但是,当然,您会失去与 GIL 的并行性。

也许你可以试试 Actor interface?该模式似乎符合您的工作流程。