Spark 从镶木地板文件读取的列名中删除特殊字符

Spark remove special characters from column name read from a parquet file

我有使用以下 spark 命令读取的 parquet 文件

lazy val out = spark.read.parquet("/tmp/oip/logprint_poc/feb28eb24ffe44cab60f2832a98795b1.parquet")

很多列的列名都有特殊字符“(”。比如WA_0_DWHRPD_Purge_Date_(TOD)WA_0_DWHRRT_Record_Type_(80=Index)如何去掉这个特殊字符。

我的最终目标是删除这些特殊字符并使用以下命令写回 parquet 文件

df_hive.write.format("parquet").save("hdfs:///tmp/oip/logprint_poc_cleaned/")

此外,我正在使用 Scala spark shell。 我是 spark 的新手,我看到了类似的问题,但在我的案例中没有任何效果。感谢任何帮助。

您可以做的第一件事就是将 parquet 文件读入数据框中。

val out = spark.read.parquet("/tmp/oip/logprint_poc/feb28eb24ffe44cab60f2832a98795b1.parquet")

创建数据框后,尝试获取数据框的架构并对其进行解析以删除所有特殊字符,如下所示:

import org.apache.spark.sql.functions._
val schema = StructType(out.schema.map(
          x => StructField(x.name.toLowerCase().replace(" ", "_").replace("#", "").replace("-", "_").replace(")", "").replace("(", "").trim(),
            x.dataType, x.nullable)))

现在您可以通过指定您创建的模式从 parquet 文件中读回数据。

val newDF = spark.read.format("parquet").schema(schema).load("/tmp/oip/logprint_poc/feb28eb24ffe44cab60f2832a98795b1.parquet")

现在您可以继续使用清理后的列名称保存数据框。

df_hive.write.format("parquet").save("hdfs:///tmp/oip/logprint_poc_cleaned/")