在 dask Client.map() 调用期间会发生什么?
What happens during dask Client.map() call?
我正在尝试使用 dask 编写网格搜索实用程序。 objective 函数调用了一个 class 的方法,其中包含一个大数据名。我正在尝试使用 dask 将计算并行化为多核解决方案,而无需复制原始 class/dataframe。我没有在文档中找到任何解决方案,所以我在这里发布了一个玩具示例:
import pickle
from dask.distributed import Client, LocalCluster
from multiprocessing import current_process
class TestClass:
def __init__(self):
self.param = 0
def __getstate__(self):
print("I am pickled!")
return self.__dict__
def loss(self, ext_param):
self.param += 1
print(f"{current_process().pid}: {hex(id(self))}: {self.param}: {ext_param} ")
return f"{self.param}_{ext_param}"
def objective_function(param):
return test_instance.loss(param)
if __name__ == '__main__':
test_instance = TestClass()
print(hex(id(test_instance)))
cluster = LocalCluster(n_workers=2)
client = Client(cluster)
futures = client.map(objective_function, range(20))
result = client.gather(futures)
print(result)
# ---- OUTPUT RESULTS ----
# 0x7fe0a5056d30
# I am pickled!
# I am pickled!
# 11347: 0x7fb9bcfa0588: 1: 0
# 11348: 0x7fb9bd0a2588: 1: 1
# 11347: 0x7fb9bcf94240: 1: 2
# 11348: 0x7fb9bd07b6a0: 1: 3
# 11347: 0x7fb9bcf945f8: 1: 4
# ['1_0', '1_1', '1_2', '1_3', '1_4']
我有以下问题:
- 为什么下面的 pickle 函数被调用了两次?
- 我注意到 map 函数的每次迭代都使用
test_instance
的新副本,正如您从每次迭代的不同 class 地址以及从事实上,test_instance.param
属性在每次迭代时都设置为 0(此行为不同于 multiprocessing.Pool 的标准实现,我已突出显示 here)。我假设在每次迭代期间,每个进程都会收到一份腌制的新副本 class - 对吗?
- 根据(2),计算时内存中有多少份
test_instance
?它是 1(对于主线程中的原始实例)+ 1(腌制副本)+ 2(每个进程中存在的实例)= 4 吗?有什么办法可以让这个值变成1吗?
我注意到一些共享内存解决方案可以通过使用 this github issue 中提出的 Ray 库获得。
Why is the following pickle function called twice?
通常,python 的 pickle 有效地将实例变量和对导入模块中的 class 的引用捆绑在一起。在 __main__
中,这可能是不可靠的,dask 回退到 cloudpickle(内部也调用 pickle)。在我看来,检查 "__main__"
in distributed.protocol.pickle.dumps
可能发生在第一次尝试 pickle 之前。
during each iteration each process will receive a fresh copy of the pickled class
是的。每次 dask 运行s 一个任务,它反序列化输入,创建实例的 nw 副本。请注意,您的 dask worker 可能是通过 fork_server 技术创建的,因此不会简单地复制内存(这是做事的安全方法)。
你可以在计算之前将实例“分散”给工作人员,他们可以重用他们的本地副本,但是 dask 任务不应该通过改变对象来工作,而是通过返回结果(即功能上)。
how many copies of test_instance are in memory
1 个在客户端,加上每个正在执行的任务一个。序列化的版本也可能在附近,可能是一个保存在图中,它暂时在客户端,然后在调度程序上保存;在反序列化时,它也会暂时存在于工作内存中。对于某些类型,zero-copy de/ser 是可能的。
如果任务因为对象的大小而非常大,你一定要事先“分散”它们(client.scatter
)。
Is there any way to get this value to 1?
您可以 运行 调度程序 and/or worker in-process 共享内存,但是,当然,您会失去与 GIL 的并行性。
也许你可以试试 Actor
interface?该模式似乎符合您的工作流程。
我正在尝试使用 dask 编写网格搜索实用程序。 objective 函数调用了一个 class 的方法,其中包含一个大数据名。我正在尝试使用 dask 将计算并行化为多核解决方案,而无需复制原始 class/dataframe。我没有在文档中找到任何解决方案,所以我在这里发布了一个玩具示例:
import pickle
from dask.distributed import Client, LocalCluster
from multiprocessing import current_process
class TestClass:
def __init__(self):
self.param = 0
def __getstate__(self):
print("I am pickled!")
return self.__dict__
def loss(self, ext_param):
self.param += 1
print(f"{current_process().pid}: {hex(id(self))}: {self.param}: {ext_param} ")
return f"{self.param}_{ext_param}"
def objective_function(param):
return test_instance.loss(param)
if __name__ == '__main__':
test_instance = TestClass()
print(hex(id(test_instance)))
cluster = LocalCluster(n_workers=2)
client = Client(cluster)
futures = client.map(objective_function, range(20))
result = client.gather(futures)
print(result)
# ---- OUTPUT RESULTS ----
# 0x7fe0a5056d30
# I am pickled!
# I am pickled!
# 11347: 0x7fb9bcfa0588: 1: 0
# 11348: 0x7fb9bd0a2588: 1: 1
# 11347: 0x7fb9bcf94240: 1: 2
# 11348: 0x7fb9bd07b6a0: 1: 3
# 11347: 0x7fb9bcf945f8: 1: 4
# ['1_0', '1_1', '1_2', '1_3', '1_4']
我有以下问题:
- 为什么下面的 pickle 函数被调用了两次?
- 我注意到 map 函数的每次迭代都使用
test_instance
的新副本,正如您从每次迭代的不同 class 地址以及从事实上,test_instance.param
属性在每次迭代时都设置为 0(此行为不同于 multiprocessing.Pool 的标准实现,我已突出显示 here)。我假设在每次迭代期间,每个进程都会收到一份腌制的新副本 class - 对吗? - 根据(2),计算时内存中有多少份
test_instance
?它是 1(对于主线程中的原始实例)+ 1(腌制副本)+ 2(每个进程中存在的实例)= 4 吗?有什么办法可以让这个值变成1吗?
我注意到一些共享内存解决方案可以通过使用 this github issue 中提出的 Ray 库获得。
Why is the following pickle function called twice?
通常,python 的 pickle 有效地将实例变量和对导入模块中的 class 的引用捆绑在一起。在 __main__
中,这可能是不可靠的,dask 回退到 cloudpickle(内部也调用 pickle)。在我看来,检查 "__main__"
in distributed.protocol.pickle.dumps
可能发生在第一次尝试 pickle 之前。
during each iteration each process will receive a fresh copy of the pickled class
是的。每次 dask 运行s 一个任务,它反序列化输入,创建实例的 nw 副本。请注意,您的 dask worker 可能是通过 fork_server 技术创建的,因此不会简单地复制内存(这是做事的安全方法)。
你可以在计算之前将实例“分散”给工作人员,他们可以重用他们的本地副本,但是 dask 任务不应该通过改变对象来工作,而是通过返回结果(即功能上)。
how many copies of test_instance are in memory
1 个在客户端,加上每个正在执行的任务一个。序列化的版本也可能在附近,可能是一个保存在图中,它暂时在客户端,然后在调度程序上保存;在反序列化时,它也会暂时存在于工作内存中。对于某些类型,zero-copy de/ser 是可能的。
如果任务因为对象的大小而非常大,你一定要事先“分散”它们(client.scatter
)。
Is there any way to get this value to 1?
您可以 运行 调度程序 and/or worker in-process 共享内存,但是,当然,您会失去与 GIL 的并行性。
也许你可以试试 Actor
interface?该模式似乎符合您的工作流程。