Spark 从镶木地板文件读取的列名中删除特殊字符
Spark remove special characters from column name read from a parquet file
我有使用以下 spark 命令读取的 parquet 文件
lazy val out = spark.read.parquet("/tmp/oip/logprint_poc/feb28eb24ffe44cab60f2832a98795b1.parquet")
很多列的列名都有特殊字符“(”。比如WA_0_DWHRPD_Purge_Date_(TOD)
,WA_0_DWHRRT_Record_Type_(80=Index)
如何去掉这个特殊字符。
我的最终目标是删除这些特殊字符并使用以下命令写回 parquet 文件
df_hive.write.format("parquet").save("hdfs:///tmp/oip/logprint_poc_cleaned/")
此外,我正在使用 Scala spark shell。
我是 spark 的新手,我看到了类似的问题,但在我的案例中没有任何效果。感谢任何帮助。
您可以做的第一件事就是将 parquet 文件读入数据框中。
val out = spark.read.parquet("/tmp/oip/logprint_poc/feb28eb24ffe44cab60f2832a98795b1.parquet")
创建数据框后,尝试获取数据框的架构并对其进行解析以删除所有特殊字符,如下所示:
import org.apache.spark.sql.functions._
val schema = StructType(out.schema.map(
x => StructField(x.name.toLowerCase().replace(" ", "_").replace("#", "").replace("-", "_").replace(")", "").replace("(", "").trim(),
x.dataType, x.nullable)))
现在您可以通过指定您创建的模式从 parquet 文件中读回数据。
val newDF = spark.read.format("parquet").schema(schema).load("/tmp/oip/logprint_poc/feb28eb24ffe44cab60f2832a98795b1.parquet")
现在您可以继续使用清理后的列名称保存数据框。
df_hive.write.format("parquet").save("hdfs:///tmp/oip/logprint_poc_cleaned/")
我有使用以下 spark 命令读取的 parquet 文件
lazy val out = spark.read.parquet("/tmp/oip/logprint_poc/feb28eb24ffe44cab60f2832a98795b1.parquet")
很多列的列名都有特殊字符“(”。比如WA_0_DWHRPD_Purge_Date_(TOD)
,WA_0_DWHRRT_Record_Type_(80=Index)
如何去掉这个特殊字符。
我的最终目标是删除这些特殊字符并使用以下命令写回 parquet 文件
df_hive.write.format("parquet").save("hdfs:///tmp/oip/logprint_poc_cleaned/")
此外,我正在使用 Scala spark shell。 我是 spark 的新手,我看到了类似的问题,但在我的案例中没有任何效果。感谢任何帮助。
您可以做的第一件事就是将 parquet 文件读入数据框中。
val out = spark.read.parquet("/tmp/oip/logprint_poc/feb28eb24ffe44cab60f2832a98795b1.parquet")
创建数据框后,尝试获取数据框的架构并对其进行解析以删除所有特殊字符,如下所示:
import org.apache.spark.sql.functions._
val schema = StructType(out.schema.map(
x => StructField(x.name.toLowerCase().replace(" ", "_").replace("#", "").replace("-", "_").replace(")", "").replace("(", "").trim(),
x.dataType, x.nullable)))
现在您可以通过指定您创建的模式从 parquet 文件中读回数据。
val newDF = spark.read.format("parquet").schema(schema).load("/tmp/oip/logprint_poc/feb28eb24ffe44cab60f2832a98795b1.parquet")
现在您可以继续使用清理后的列名称保存数据框。
df_hive.write.format("parquet").save("hdfs:///tmp/oip/logprint_poc_cleaned/")