在 Dask Dataframe 子集上强制定位

Forcing Locality on Dask Dataframe Subsets

我正在尝试将一个大型 Dask Dataframe 分布在多台机器上,以便(稍后)在该 Dataframe 上进行分布式计算。我正在为此使用 dask-distributed。

我看到的所有 dask-distributed examples/docs 都是从网络资源(hdfs、s3 等)填充初始数据加载,并且似乎没有将 DAG 优化扩展到加载部分(似乎假设网络负载是一个必要的罪恶并且只会消耗初始成本。)这在另一个问题的答案中被强调:

但是,我可以看到我们想要这个的情况。例如,如果我们有一个分片数据库 + dask worker 共同位于该数据库的节点上,我们希望强制仅将来自本地分片的记录填充到本地 dask worker 中。从 documentation/examples 来看,网络交叉似乎是一个必然的假设成本。 是否可以强制从特定 worker 获取单个数据框的部分内容?

我尝试过的另一种方法是尝试强制每个工作人员 运行 一个函数(迭代提交给每个工作人员),其中该函数仅加载该函数的本地数据 machine/shard.这行得通,而且我有一堆具有相同列模式的最佳本地数据帧——但是——现在我没有一个数据帧,而是 n 数据帧。 是否可以 merge/fuse 跨多台机器的数据帧,因此只有一个数据帧引用,但部分与特定机器具有亲和力(在合理范围内,由任务 DAG 决定)?

您可以生成 dask "collections",例如来自期货和延迟对象的数据帧,它们 inter-operate 彼此很好。

对于每个分区,你知道应该加载它的机器,你可以产生一个未来如下:

f = c.submit(make_part_function, args, workers={'my.worker.ip'})

其中 c 是 dask 客户端,地址是您希望看到它发生的机器。您也可以给出 allow_other_workers=True 这是偏好而不是要求。

要根据此类期货列表制作数据框,您可以这样做

df = dd.from_delayed([dask.delayed(f) for f in futures])

最好提供 meta=,给出预期数据帧的描述。现在,对给定分区的进一步操作将更愿意安排在已经拥有数据的同一个工作人员上。

我也对能够将计算限制在特定节点(以及本地化到该节点的数据)感兴趣。我试图用一个简单的脚本(见下文)来实现上面的内容,但是查看生成的数据框,导致错误(来自 dask/dataframe/utils.py::check_meta()):

ValueError: Metadata mismatch found in `from_delayed`.

Expected partition of type `DataFrame` but got `DataFrame`

示例:

from dask.distributed import Client
import dask.dataframe as dd
import dask

client = Client(address='<scheduler_ip>:8786')
client.restart()

filename_1 = 'http://samplecsvs.s3.amazonaws.com/Sacramentorealestatetransactions.csv'
filename_2 = 'http://samplecsvs.s3.amazonaws.com/SalesJan2009.csv'

future_1 = client.submit(dd.read_csv, filename_1, workers='w1')
future_2 = client.submit(dd.read_csv, filename_2, workers='w2')

client.has_what()
# Returns: {'tcp://<w1_ip>:41942': ('read_csv-c08b231bb22718946756cf46b2e0f5a1',),
#           'tcp://<w2_ip>:41942': ('read_csv-e27881faa0f641e3550a8d28f8d0e11d',)}

df = dd.from_delayed([dask.delayed(f) for f in [future_1, future_2]])

type(df)
# Returns: dask.dataframe.core.DataFrame

df.head()
# Returns:
#      ValueError: Metadata mismatch found in `from_delayed`.
#      Expected partition of type `DataFrame` but got `DataFrame`

注意 dask 环境有两个工作节点(别名为 w1 和 w2)一个调度程序节点,脚本在外部主机上 运行。 dask==1.2.2,分布式==1.28.1

并行调用许多 dask dataframe 函数很奇怪。也许您打算改为并行调用许多 Pandas read_csv 调用?

# future_1 = client.submit(dd.read_csv, filename_1, workers='w1')
# future_2 = client.submit(dd.read_csv, filename_2, workers='w2')
future_1 = client.submit(pandas.read_csv, filename_1, workers='w1')
future_2 = client.submit(pandas.read_csv, filename_2, workers='w2')

有关详细信息,请参阅 https://docs.dask.org/en/latest/delayed-best-practices.html#don-t-call-dask-delayed-on-other-dask-collections