Dask 延迟总和被杀死,但有足够的资源
Dask delayed sum gets killed but there are enough resources
我正在创建一个读取整个文件夹的函数,创建一个 Dask 数据框,然后处理该数据框的分区并对结果求和,如下所示:
import dask.dataframe as dd
from dask import delayed, compute
def partitions_func(folder):
df = dd.read_csv(f'{folder}/*.csv')
partial_results = []
for partition in df.partitions:
partial = another_function(partition)
partial_results.append(partial)
total = delayed(sum)(partial_results)
return total
在 partitions_func
(another_function
) 中调用的函数也被延迟。
@delayed
def another_function(partition):
# Partition processing
return result
我检查了一下,在处理过程中创建的变量都很小,所以它们应该不会引起任何问题。分区可以很大,但不能大于可用 RAM。
当我执行 partitions_func(folder)
时,进程被终止。起初,我认为问题与有两个 delayed
有关,一个在 another_function
上,一个在 delayed(sum)
.
上
从 another_function
中删除 delayed
装饰器会导致问题,因为参数是一个 Dask 数据框,您不能执行 tolist()
之类的操作。我尝试从 sum
中删除 delayed
,因为我认为这可能是并行化和可用资源的问题,但进程也会被终止。
不过,我知道有5个分区。如果我从 partitions_func
中删除语句 total = delayed(sum)(partial_results)
并改为“手动”计算总和,一切都会按预期进行:
total = partial_results[0].compute() + partial_results[1].compute() + partial_results[2].compute() \
+ partial_results[3].compute() + partial_results[4].compute()
谢谢!
Dask dataframe 创建一系列延迟对象,因此当您调用延迟函数时 another_function
变成嵌套延迟并且 dask.compute
将无法处理它。一种选择是使用.map_partitions()
,典型的例子是df.map_partitions(len).compute()
,它将计算每个分区的长度。因此,如果您可以重写 another_function
以接受 pandas 数据帧,并删除延迟装饰器,那么您的代码将大致如下所示:
df = dd.read_csv(f'{folder}/*.csv')
total = df.map_partitions(another_function)
现在 total
是一个延迟对象,您可以将其传递给 dask.compute
(或简单地 运行 total = df.map_partitions(another_function).compute()
)。
我正在创建一个读取整个文件夹的函数,创建一个 Dask 数据框,然后处理该数据框的分区并对结果求和,如下所示:
import dask.dataframe as dd
from dask import delayed, compute
def partitions_func(folder):
df = dd.read_csv(f'{folder}/*.csv')
partial_results = []
for partition in df.partitions:
partial = another_function(partition)
partial_results.append(partial)
total = delayed(sum)(partial_results)
return total
在 partitions_func
(another_function
) 中调用的函数也被延迟。
@delayed
def another_function(partition):
# Partition processing
return result
我检查了一下,在处理过程中创建的变量都很小,所以它们应该不会引起任何问题。分区可以很大,但不能大于可用 RAM。
当我执行 partitions_func(folder)
时,进程被终止。起初,我认为问题与有两个 delayed
有关,一个在 another_function
上,一个在 delayed(sum)
.
从 another_function
中删除 delayed
装饰器会导致问题,因为参数是一个 Dask 数据框,您不能执行 tolist()
之类的操作。我尝试从 sum
中删除 delayed
,因为我认为这可能是并行化和可用资源的问题,但进程也会被终止。
不过,我知道有5个分区。如果我从 partitions_func
中删除语句 total = delayed(sum)(partial_results)
并改为“手动”计算总和,一切都会按预期进行:
total = partial_results[0].compute() + partial_results[1].compute() + partial_results[2].compute() \
+ partial_results[3].compute() + partial_results[4].compute()
谢谢!
Dask dataframe 创建一系列延迟对象,因此当您调用延迟函数时 another_function
变成嵌套延迟并且 dask.compute
将无法处理它。一种选择是使用.map_partitions()
,典型的例子是df.map_partitions(len).compute()
,它将计算每个分区的长度。因此,如果您可以重写 another_function
以接受 pandas 数据帧,并删除延迟装饰器,那么您的代码将大致如下所示:
df = dd.read_csv(f'{folder}/*.csv')
total = df.map_partitions(another_function)
现在 total
是一个延迟对象,您可以将其传递给 dask.compute
(或简单地 运行 total = df.map_partitions(another_function).compute()
)。