使用 Databricks 并行加载小文件

small file parallel loading with Databricks

我需要处理几千个小日志文件。

我选择 Databricks 来处理这个问题,因为它具有良好的并行计算能力,并且可以与托管文件的 Azure Blob 存储帐户很好地交互。

经过一些研究,我总是检索到相同的代码片段(在 PySpark 中)。

# Getting your list of files with custom function
list_of_files = get_my_files()

# Create a path_rdd and use a custom udf to parse it
path_rdd = sc.parallelize(list_of_files)
content = path_rdd.map(parse_udf).collect()

有没有更好的方法来做到这一点?如果日志文件是 CSV 格式,您会选择 平面图 吗?

谢谢!

我目前的解决方案是:

content = sc.wholeTextFiles('/mnt/container/foo/*/*/', numPartitions=XX)
parsed_content = content.flatMap(custom_parser).collect()

我以字符串形式读取文件的所有内容并保留它们的文件名。 然后我使用 flatMap 将其传递给我的解析函数 "custom_parser","where custom_parser" 定义为

def custom_parser(*argv):
    file, content = argv
    # Apply magic
    return parsed_content_

我目前正在完成 .collect() 操作,但我会更改它以直接保存输出。