使用 Databricks 并行加载小文件
small file parallel loading with Databricks
我需要处理几千个小日志文件。
我选择 Databricks 来处理这个问题,因为它具有良好的并行计算能力,并且可以与托管文件的 Azure Blob 存储帐户很好地交互。
经过一些研究,我总是检索到相同的代码片段(在 PySpark 中)。
# Getting your list of files with custom function
list_of_files = get_my_files()
# Create a path_rdd and use a custom udf to parse it
path_rdd = sc.parallelize(list_of_files)
content = path_rdd.map(parse_udf).collect()
有没有更好的方法来做到这一点?如果日志文件是 CSV 格式,您会选择 平面图 吗?
谢谢!
我目前的解决方案是:
content = sc.wholeTextFiles('/mnt/container/foo/*/*/', numPartitions=XX)
parsed_content = content.flatMap(custom_parser).collect()
我以字符串形式读取文件的所有内容并保留它们的文件名。
然后我使用 flatMap 将其传递给我的解析函数 "custom_parser","where custom_parser" 定义为
def custom_parser(*argv):
file, content = argv
# Apply magic
return parsed_content_
我目前正在完成 .collect() 操作,但我会更改它以直接保存输出。
我需要处理几千个小日志文件。
我选择 Databricks 来处理这个问题,因为它具有良好的并行计算能力,并且可以与托管文件的 Azure Blob 存储帐户很好地交互。
经过一些研究,我总是检索到相同的代码片段(在 PySpark 中)。
# Getting your list of files with custom function
list_of_files = get_my_files()
# Create a path_rdd and use a custom udf to parse it
path_rdd = sc.parallelize(list_of_files)
content = path_rdd.map(parse_udf).collect()
有没有更好的方法来做到这一点?如果日志文件是 CSV 格式,您会选择 平面图 吗?
谢谢!
我目前的解决方案是:
content = sc.wholeTextFiles('/mnt/container/foo/*/*/', numPartitions=XX)
parsed_content = content.flatMap(custom_parser).collect()
我以字符串形式读取文件的所有内容并保留它们的文件名。 然后我使用 flatMap 将其传递给我的解析函数 "custom_parser","where custom_parser" 定义为
def custom_parser(*argv):
file, content = argv
# Apply magic
return parsed_content_
我目前正在完成 .collect() 操作,但我会更改它以直接保存输出。