是否可以将 dask 并行化封装在一个 class 中?
Can dask parallelization be encapsulated in a class?
是否可以在 class 中封装 dask 并行化?在其最终形式中,我的 class 将在调用 运行 之前进行大量初始化 - 我将我的问题缩减为框架问题。请注意,该代码适用于 LocalCluster,并且 class 之外的分布式计算也适用于同一 HPC 集群。这是经过简化的代码以及相应的错误消息:
import numpy as np
from dask_jobqueue import PBSCluster
from dask.distributed import Client
from dask.distributed import wait
class Simulate:
def __init__(self):
pass
def run(self):
cluster = PBSCluster(cores=12, memory='1GB', queue='low', project='classtest', name='classtest_dask',
walltime='02:00:00', local_directory='/scratch/mmatthews')
cluster.scale(10) # Ask for # workers
client = Client(cluster)
seeds = list(np.arange(100))
a = client.map(self.run_trial, seeds)
wait(a)
trial_results = [a[i].result() for i in range(len(a))]
cluster.scale(0)
cluster.close()
def run_trial(self, trial_seed):
np.random.seed(trial_seed)
rst = np.random.randint
print('Simulation Finished rst=%s' % rst)
return rst
simob = Simulate()
simob.run()
StdErr 错误:
> distributed.client - ERROR - Failed to reconnect to scheduler after
> 10.00 seconds, closing client distributed.utils - ERROR - Traceback (most recent call last): File
> "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/site-packages/distributed/utils.py",
> line 666, in log_errors
> yield File "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/site-packages/distributed/client.py",
> line 1268, in _close
> await gen.with_timeout(timedelta(seconds=2), list(coroutines)) concurrent.futures._base.CancelledError distributed.utils - ERROR -
> Traceback (most recent call last): File
> "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/site-packages/distributed/utils.py",
> line 666, in log_errors
> yield File "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/site-packages/distributed/client.py",
> line 998, in _reconnect
> await self._close() File "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/site-packages/distributed/client.py",
> line 1268, in _close
> await gen.with_timeout(timedelta(seconds=2), list(coroutines)) concurrent.futures._base.CancelledError
PBS 错误文件中的错误:
$ cat classtest_dask.e156272
distributed.nanny - INFO - Start Nanny at: 'tcp://160.84.192.224:40753'
distributed.diskutils - INFO - Found stale lock file and directory '/scratch/mmatthews/worker-bnjpcqmq', purging
distributed.worker - INFO - Start worker at: tcp://160.84.192.224:44564
distributed.worker - INFO - Listening to: tcp://160.84.192.224:44564
distributed.worker - INFO - dashboard at: 160.84.192.224:35232
distributed.worker - INFO - Waiting to connect to: tcp://160.84.192.193:39664
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 12
distributed.worker - INFO - Memory: 1000.00 MB
distributed.worker - INFO - Local Directory: /scratch/mmatthews/worker-kbw6dtj_
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Registered to: tcp://160.84.192.193:39664
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.dask_worker - INFO - Exiting on signal 15
distributed.nanny - INFO - Closing Nanny at 'tcp://160.84.192.224:40753'
distributed.dask_worker - INFO - End worker
Traceback (most recent call last):
File "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/runpy.py", line 193, in _run_module_as_main
"__main__", mod_spec)
File "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/runpy.py", line 85, in _run_code
exec(code, run_globals)
File "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/site-packages/distributed/cli/dask_worker.py", line 410, in <module>
go()
File "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/site-packages/distributed/cli/dask_worker.py", line 406, in go
main()
File "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/site-packages/click/core.py", line 764, in __call__
return self.main(*args, **kwargs)
File "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/site-packages/click/core.py", line 717, in main
rv = self.invoke(ctx)
File "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/site-packages/click/core.py", line 956, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/site-packages/click/core.py", line 555, in invoke
return callback(*args, **kwargs)
File "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/site-packages/distributed/cli/dask_worker.py", line 397, in main
raise TimeoutError("Timed out starting worker.") from None
tornado.util.TimeoutError: Timed out starting worker.
Can dask parallelization be encapsulated in a class?
是的。 Dask 调用只是正常的 Python 调用。没有什么可以阻止他们与其他语言进行交互。
您的实际错误似乎完全无关。好像有什么东西杀了你的工人
distributed.dask_worker - INFO - Exiting on signal 15
遗憾的是,没有关于那是什么的信息。我建议与您的系统管理员联系。
是否可以在 class 中封装 dask 并行化?在其最终形式中,我的 class 将在调用 运行 之前进行大量初始化 - 我将我的问题缩减为框架问题。请注意,该代码适用于 LocalCluster,并且 class 之外的分布式计算也适用于同一 HPC 集群。这是经过简化的代码以及相应的错误消息:
import numpy as np
from dask_jobqueue import PBSCluster
from dask.distributed import Client
from dask.distributed import wait
class Simulate:
def __init__(self):
pass
def run(self):
cluster = PBSCluster(cores=12, memory='1GB', queue='low', project='classtest', name='classtest_dask',
walltime='02:00:00', local_directory='/scratch/mmatthews')
cluster.scale(10) # Ask for # workers
client = Client(cluster)
seeds = list(np.arange(100))
a = client.map(self.run_trial, seeds)
wait(a)
trial_results = [a[i].result() for i in range(len(a))]
cluster.scale(0)
cluster.close()
def run_trial(self, trial_seed):
np.random.seed(trial_seed)
rst = np.random.randint
print('Simulation Finished rst=%s' % rst)
return rst
simob = Simulate()
simob.run()
StdErr 错误:
> distributed.client - ERROR - Failed to reconnect to scheduler after
> 10.00 seconds, closing client distributed.utils - ERROR - Traceback (most recent call last): File
> "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/site-packages/distributed/utils.py",
> line 666, in log_errors
> yield File "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/site-packages/distributed/client.py",
> line 1268, in _close
> await gen.with_timeout(timedelta(seconds=2), list(coroutines)) concurrent.futures._base.CancelledError distributed.utils - ERROR -
> Traceback (most recent call last): File
> "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/site-packages/distributed/utils.py",
> line 666, in log_errors
> yield File "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/site-packages/distributed/client.py",
> line 998, in _reconnect
> await self._close() File "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/site-packages/distributed/client.py",
> line 1268, in _close
> await gen.with_timeout(timedelta(seconds=2), list(coroutines)) concurrent.futures._base.CancelledError
PBS 错误文件中的错误:
$ cat classtest_dask.e156272
distributed.nanny - INFO - Start Nanny at: 'tcp://160.84.192.224:40753'
distributed.diskutils - INFO - Found stale lock file and directory '/scratch/mmatthews/worker-bnjpcqmq', purging
distributed.worker - INFO - Start worker at: tcp://160.84.192.224:44564
distributed.worker - INFO - Listening to: tcp://160.84.192.224:44564
distributed.worker - INFO - dashboard at: 160.84.192.224:35232
distributed.worker - INFO - Waiting to connect to: tcp://160.84.192.193:39664
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 12
distributed.worker - INFO - Memory: 1000.00 MB
distributed.worker - INFO - Local Directory: /scratch/mmatthews/worker-kbw6dtj_
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Registered to: tcp://160.84.192.193:39664
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.dask_worker - INFO - Exiting on signal 15
distributed.nanny - INFO - Closing Nanny at 'tcp://160.84.192.224:40753'
distributed.dask_worker - INFO - End worker
Traceback (most recent call last):
File "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/runpy.py", line 193, in _run_module_as_main
"__main__", mod_spec)
File "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/runpy.py", line 85, in _run_code
exec(code, run_globals)
File "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/site-packages/distributed/cli/dask_worker.py", line 410, in <module>
go()
File "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/site-packages/distributed/cli/dask_worker.py", line 406, in go
main()
File "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/site-packages/click/core.py", line 764, in __call__
return self.main(*args, **kwargs)
File "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/site-packages/click/core.py", line 717, in main
rv = self.invoke(ctx)
File "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/site-packages/click/core.py", line 956, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/site-packages/click/core.py", line 555, in invoke
return callback(*args, **kwargs)
File "/nfs/system/miniconda3_dev/envs/rosetta_dev/lib/python3.7/site-packages/distributed/cli/dask_worker.py", line 397, in main
raise TimeoutError("Timed out starting worker.") from None
tornado.util.TimeoutError: Timed out starting worker.
Can dask parallelization be encapsulated in a class?
是的。 Dask 调用只是正常的 Python 调用。没有什么可以阻止他们与其他语言进行交互。
您的实际错误似乎完全无关。好像有什么东西杀了你的工人
distributed.dask_worker - INFO - Exiting on signal 15
遗憾的是,没有关于那是什么的信息。我建议与您的系统管理员联系。