Dask 数据帧合并内存错误与大型 csv 文件
Dask dataframe merge memory error with large csv files
这是我的代码的简化版本。
import dask
import dask.dataframe as dask_frame
from dask.distributed import Client, LocalCluster
def main():
cluster = LocalCluster(n_workers=4, threads_per_worker=2)
client = Client(cluster)
csv_path_one = "" # both have 70 columns and around 70 million rows. at a size of about 25 gigabytes
csv_path_two = ""
# the columns are a mix of ints floats datetimes and strings
# almost all string lengths are less than 15 two of the longest string columns have a max length of 70
left_df = dask_frame.read_csv(csv_path_one, sep="|", quotechar="+", encoding="Latin-1", dtype="object")
right_df = dask_frame.read_csv(csv_path_one, sep=",", quotechar="\"", encoding="utf-8", dtype="object")
cand_keys = [""] # I have 3
merged = dask_frame.merge(left_df, right_df, how='outer', on=cand_keys, suffixes=("_L", "_R"),indicator=True)
missing_mask = merged._merge != 'both'
missing_findings: dask_frame.DataFrame = merged.loc[missing_mask, cand_keys + ["_merge"]]
print(f"Running {client}")
missing_findings.to_csv("results/findings-*.csv")
cluster.close()
client.close()
if __name__ == '__main__':
main()
这个例子永远不会完成,dask 到达某个部分然后一个或多个工人立即超过内存限制并且保姆杀死他们并回滚所有工人的进度
查看诊断页面通常内存峰值发生在随机拆分任务的一半左右。
我是 运行 Windows 上的 Dask 2.9.1。
我可以更新 Dask,但我当前的设置很痛苦,我不知道它是否能解决我的问题
2.15 的更新解决了这个问题。
这是我的代码的简化版本。
import dask
import dask.dataframe as dask_frame
from dask.distributed import Client, LocalCluster
def main():
cluster = LocalCluster(n_workers=4, threads_per_worker=2)
client = Client(cluster)
csv_path_one = "" # both have 70 columns and around 70 million rows. at a size of about 25 gigabytes
csv_path_two = ""
# the columns are a mix of ints floats datetimes and strings
# almost all string lengths are less than 15 two of the longest string columns have a max length of 70
left_df = dask_frame.read_csv(csv_path_one, sep="|", quotechar="+", encoding="Latin-1", dtype="object")
right_df = dask_frame.read_csv(csv_path_one, sep=",", quotechar="\"", encoding="utf-8", dtype="object")
cand_keys = [""] # I have 3
merged = dask_frame.merge(left_df, right_df, how='outer', on=cand_keys, suffixes=("_L", "_R"),indicator=True)
missing_mask = merged._merge != 'both'
missing_findings: dask_frame.DataFrame = merged.loc[missing_mask, cand_keys + ["_merge"]]
print(f"Running {client}")
missing_findings.to_csv("results/findings-*.csv")
cluster.close()
client.close()
if __name__ == '__main__':
main()
这个例子永远不会完成,dask 到达某个部分然后一个或多个工人立即超过内存限制并且保姆杀死他们并回滚所有工人的进度
查看诊断页面通常内存峰值发生在随机拆分任务的一半左右。
我是 运行 Windows 上的 Dask 2.9.1。 我可以更新 Dask,但我当前的设置很痛苦,我不知道它是否能解决我的问题
2.15 的更新解决了这个问题。