Why does dask.distributed.Client raise "TypeError: cannot pickle '_thread.RLock' object" when provided with a used defined LocalCluster argument?
Why does dask.distributed.Client raise "TypeError: cannot pickle '_thread.RLock' object" when provided with a used defined LocalCluster argument?
我可以使用 dask.distributed.Client
的空参数构造函数隐式创建 dask.distributed.LocalCluster
。但是,当我尝试使用关键字参数显式定义集群时,它失败并引发异常:TypeError: cannot pickle '_thread.RLock' object
.
下面是一个简单的复制器:
import dask.bag
from dask.distributed import Client, LocalCluster
def main():
with Client():
dask.bag.from_sequence(range(5)).map(print).compute() # prints 0 through 4
cluster = LocalCluster()
with Client(cluster=cluster): # raises TypeError: cannot pickle '_thread.RLock' object
dask.bag.from_sequence(range(5)).map(print).compute()
if __name__ == "__main__":
main()
问题是您向 Client
构造函数提供了错误的关键字参数。
您提供的 cluster
关键字参数被解释为应该传递给工作人员的关键字参数。由于无法对 LocalCluster
进行 pickle,因此会生成您看到的异常。
Client(cluster=cluster)
应替换为 Client(address=cluster)
或简单地 Client(cluster)
.
一个完整的工作示例是:
import dask.bag
from dask.distributed import Client, LocalCluster
def main():
cluster = LocalCluster()
with Client(address=cluster):
dask.bag.from_sequence(range(5)).map(print).compute()
if __name__ == "__main__":
main()
我可以使用 dask.distributed.Client
的空参数构造函数隐式创建 dask.distributed.LocalCluster
。但是,当我尝试使用关键字参数显式定义集群时,它失败并引发异常:TypeError: cannot pickle '_thread.RLock' object
.
下面是一个简单的复制器:
import dask.bag
from dask.distributed import Client, LocalCluster
def main():
with Client():
dask.bag.from_sequence(range(5)).map(print).compute() # prints 0 through 4
cluster = LocalCluster()
with Client(cluster=cluster): # raises TypeError: cannot pickle '_thread.RLock' object
dask.bag.from_sequence(range(5)).map(print).compute()
if __name__ == "__main__":
main()
问题是您向 Client
构造函数提供了错误的关键字参数。
您提供的 cluster
关键字参数被解释为应该传递给工作人员的关键字参数。由于无法对 LocalCluster
进行 pickle,因此会生成您看到的异常。
Client(cluster=cluster)
应替换为 Client(address=cluster)
或简单地 Client(cluster)
.
一个完整的工作示例是:
import dask.bag
from dask.distributed import Client, LocalCluster
def main():
cluster = LocalCluster()
with Client(address=cluster):
dask.bag.from_sequence(range(5)).map(print).compute()
if __name__ == "__main__":
main()