如何使用 Dask 在 yarn 上 运行 并行化 python 作业?

How to run parallelized python jobs on yarn using Dask?

我有几个关于将 Dask 与 Hadoop/Yarn 一起使用的问题。


1 ) 如何将 Dask 连接到 Hadoop/YARN 并并行化作业?
当我尝试使用:

from dask.distributed import Client
client = Client('Mynamenode:50070') 

它导致错误:

CommClosedError: in : Stream is closed: while trying to call remote method 'identity'

我应该传递名称节点或数据节点的地址吗?我可以改用 Zookeeper 吗?


2 ) 如何使用 Dask 和 HDFS3 从 HDFS 读取数据?
当我尝试使用以下方式读取文件时:

import dask.dataframe as dd
import distributed.hdfs
df = dd.read_csv('hdfs:///user/uname/dataset/temps.csv')

它导致以下错误:

ImportError: No module named lib

我已经尝试卸载并重新安装 hdfs3,但错误仍然存​​在。

我已经安装了 knit 并尝试使用以下示例启动 yarn 容器:

http://knit.readthedocs.io/en/latest/examples.html#ipython-parallel

此操作因安全错误而失败。

我在集群上没有 sudo 访问权限,所以在集群中的每个节点上安装任何包都是不可能的,我只能安装通过 condapip 在我的 userid.


最后,如果有人可以 post Dask on Yarn 的工作示例,那将非常有帮助。

非常感谢任何帮助,

dask-on-yarn 的最简单实现如下所示

  • conda install knit -c conda-forge 安装 knit(包 "dask-yarn" 很快就会可用,也许是一个更明显的名字)

可以在 the documentation 中找到有关如何创建 dask 集群的最简单示例。这里创建一个本地的conda环境,上传到HDFS,让YARN分发给worker,所以不需要sudo权限。

请注意,您可以传递很多参数,因此建议您阅读文档的使用和故障排除部分。

具体问题解答

1) Client('Mynamenode:50070') - hadoop 对 dask 一无所知,名称节点服务器没有理由知道如何处理 dask 客户端连接

2) No module named lib - 这很奇怪,也许是一个应该单独记录的错误。我鼓励您检查客户端和任何工作人员中是否具有兼容版本的 hdfs3(最好是最新版本)

3) fails with a security error - 这是相当模糊的,没有进一步的信息我不能说更多。您启用了什么安全措施,您看到什么错误?可能是您需要使用 kerberos 进行身份验证但没有 运行 kinit.