将本地文件从客户端加载到 dask 分布式集群

Loading local file from client onto dask distributed cluster

有点初学者的问题,但我找不到相关的答案..

基本上我的数据(7GB)位于我的本地机器上。我在本地网络上有分布式集群 运行。我怎样才能把这个文件放到集群上?

通常的 dd.read_csv() 或 read_parquet() 失败,因为工作人员无法在他们自己的环境中找到文件。

是否需要手动将文件传输到集群中的每个节点?

注意:由于管理员限制,我只能使用 SFTP...

两个选项

网络文件系统

正如评论中所建议的,使用普通文件系统解决方案,有多种方法可以让集群中的其他机器访问您的本地文件。如果您可以访问,这是一个不错的选择。

在本地加载和分散

如果这不起作用,那么您始终可以在本地加载数据并将其分散到集群中的各个工作程序。如果您的文件大于单台计算机的内存,那么您可能需要一块一块地处理。

单程

如果一切都适合内存,那么我会正常加载数据,然后将其分散给工作人员。如果需要,您可以随后将其拆分并传播给其他工人:

import pandas
import dask.dataframe as dd
from dask.distributed import Client

client = Client('scheduler-address:8786')

df = pd.read_csv('myfile.csv')
future = client.scatter(df)  # send dataframe to one worker
ddf = dd.from_delayed([future], meta=df)  # build dask.dataframe on remote data
ddf = ddf.repartition(npartitions=20).persist()  # split
client.rebalance(ddf)  # spread around all of your workers

多位

如果你有多个小文件,那么你可以迭代加载和分散,也许在 for 循环中,然后从许多 futures

中创建一个 dask.dataframe
futures = []
for fn in filenames:
    df = pd.read_csv(fn)
    future = client.scatter(df)
    futures.append(future)

ddf = dd.from_delayed(futures, meta=df)

在这种情况下,您可以跳过重新分区和重新平衡步骤

如果您有一个大文件,那么您可能需要自己对它进行一些拆分,或者使用 pd.read_csv(..., chunksize=...)

网络解决方案:

  • 仅在 Windows 下它应该与共享 forlder 一起工作:dd.read_csv("\\server\shared_dir")

  • 只有在 Unix/Linux 下才能与 HDFS 一起使用:导入 hdfs3 然后 hdfs.read_csv('/server/data_dir'...)

但是如果你想同时使用 Windows 和 Linux 工作人员我不知道,因为 dd.read_csv() 和 UNC 似乎不受 Linux(因为文件路径'\server\data_dir')和带有 hdfs.read_csv 的 HDFS 在 Windows 下不工作(导入 hdfs3 失败,因为库 libhdfs3.so 没有存在于 Windows)

有人有针对 Windows 和 Unix 下的工作人员的网络解决方案吗?