如何将 Spark Dataframe(在 DataBricks 中)写入 Blob 存储(在 Azure 中)?

How to Write a Spark Dataframe (in DataBricks) to Blob Storage (in Azure)?

我在 DataBricks 工作,我有一个 DataFrame。

type(df) 
Out: pyspark.sql.dataframe.DataFrame

我唯一想要的就是将这个完整的 spark 数据帧写入 Azure Blob Storage.

我找到了 post。所以我尝试了那个代码:

# Configure blob storage account access key globally
spark.conf.set(
  "fs.azure.account.key.%s.blob.core.windows.net" % storage_name,
  sas_key)

output_container_path = "wasbs://%s@%s.blob.core.windows.net" % (output_container_name, storage_name)
output_blob_folder = "%s/wrangled_data_folder" % output_container_path

# write the dataframe as a single file to blob storage
(datafiles
 .coalesce(1)
 .write
 .mode("overwrite")
 .option("header", "true")
 .format("com.databricks.spark.csv")
 .save(output_blob_folder))

运行 该代码导致以下错误。为 parquet 和其他格式更改 "csv" 部分也失败了。

org.apache.spark.sql.AnalysisException: CSV data source does not support struct<AccessoryMaterials:string,CommercialOptions:string,DocumentsUsed:array<string>,Enumerations:array<string>,EnvironmentMeasurements:string,Files:array<struct<Value:string,checksum:string,checksumType:string,name:string,size:string>>,GlobalProcesses:string,Printouts:array<string>,Repairs:string,SoftwareCapabilities:string,TestReports:string,endTimestamp:string,name:string,signature:string,signatureMeaning:bigint,startTimestamp:string,status:bigint,workplace:string> data type.;

因此我的问题(我的假设应该很简单): 如何将我的 spark 数据帧从 DataBricks 写入 Azure Blob 存储?

我的 Azure 文件夹结构是这样的:

Account = MainStorage 
Container 1 is called "Data" # containing all the data, irrelevant because i already read this in. 
Container 2 is called "Output" # here I want to store my Spark Dataframe. 

非常感谢!

编辑 我正在使用 Python。但是,我不介意解决方案是否使用其他语言(只要 DataBricks 支持它们,例如 R/Scala 等)。如果有效,那就完美了:-)

假设您已经安装了 blob 存储,请使用以下方法将您的数据框写入 csv 格式。
请注意,新创建的文件将具有一些带有 csv 扩展名的默认文件名,因此您可能需要使用一致的名称重命名它。

// output_container_path= wasbs://ContainerName@StorageAccountName.blob.core.windows.net/DirectoryName 
val mount_root = "/mnt/ContainerName/DirectoryName"
df.coalesce(1).write.format("csv").option("header","true").mode("OverWrite").save(s"dbfs:$mount_root/")