Spark 中区分大小写的镶木地板模式合并
Case sensitive parquet schema merge in Spark
我正在尝试使用 Spark 加载和分析一些 parquet 文件。
我正在使用 schemaMerge
加载文件,因为较新的文件有一些额外的列。还有一些文件的列名是小写的,而另一些是大写的。
例如
file1.parquet
的架构类似于
column1 integer,
column2 integer
和 file2.parquet
类似:
Column1 integer,
Column2 integer,
Column3 integer
我 运行 遇到 ParquetFileFormat
class 的 inferSchema
方法的问题。模式合并委托给 spark sql 的 StructType
merge
方法。据我所知,该方法只能以区分大小写的方式工作。
在内部,它使用映射按名称查找字段,如果大小写不匹配,它将把它解释为一个新字段。稍后,当检查模式是否有重复项时,区分大小写的配置将得到尊重,我们最终会得到重复的列。这导致
org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the data schema
有什么方法可以使模式合并不区分大小写吗?
我期待得到这样的结果架构:
column1 integer,
column2 integer,
Column3 integer
您可以在配置中设置 spark.sql.caseSensitive=true
以使 Spark SQL 模式区分大小写。它还会影响架构合并。
scala> spark.conf.set("spark.sql.caseSensitive","true")
scala> val df = sc.parallelize(1 to 1000).toDF()
df: org.apache.spark.sql.DataFrame = [value: int]
scala> df.withColumnRenamed("value","VALUE").write.parquet("test_uc")
scala> df.write.parquet("test_lc")
scala> val df2=spark.read.option("mergeSchema","true").parquet("test_*")
df2: org.apache.spark.sql.DataFrame = [value: int, VALUE: int]
scala> val merged = df2.columns.groupBy(_.toLowerCase)
.map(t => coalesce(t._2.map(col):_*).as(t._1))
.toArray
merged: Array[org.apache.spark.sql.Column] = Array(coalesce(value, VALUE) AS `value`)
scala> df2.select(merged:_*)
res2: org.apache.spark.sql.DataFrame = [value: int]
scala> spark.conf.set("spark.sql.caseSensitive","false")
// process your dataframe
我正在尝试使用 Spark 加载和分析一些 parquet 文件。
我正在使用 schemaMerge
加载文件,因为较新的文件有一些额外的列。还有一些文件的列名是小写的,而另一些是大写的。
例如
file1.parquet
的架构类似于
column1 integer,
column2 integer
和 file2.parquet
类似:
Column1 integer,
Column2 integer,
Column3 integer
我 运行 遇到 ParquetFileFormat
class 的 inferSchema
方法的问题。模式合并委托给 spark sql 的 StructType
merge
方法。据我所知,该方法只能以区分大小写的方式工作。
在内部,它使用映射按名称查找字段,如果大小写不匹配,它将把它解释为一个新字段。稍后,当检查模式是否有重复项时,区分大小写的配置将得到尊重,我们最终会得到重复的列。这导致
org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the data schema
有什么方法可以使模式合并不区分大小写吗?
我期待得到这样的结果架构:
column1 integer,
column2 integer,
Column3 integer
您可以在配置中设置 spark.sql.caseSensitive=true
以使 Spark SQL 模式区分大小写。它还会影响架构合并。
scala> spark.conf.set("spark.sql.caseSensitive","true")
scala> val df = sc.parallelize(1 to 1000).toDF()
df: org.apache.spark.sql.DataFrame = [value: int]
scala> df.withColumnRenamed("value","VALUE").write.parquet("test_uc")
scala> df.write.parquet("test_lc")
scala> val df2=spark.read.option("mergeSchema","true").parquet("test_*")
df2: org.apache.spark.sql.DataFrame = [value: int, VALUE: int]
scala> val merged = df2.columns.groupBy(_.toLowerCase)
.map(t => coalesce(t._2.map(col):_*).as(t._1))
.toArray
merged: Array[org.apache.spark.sql.Column] = Array(coalesce(value, VALUE) AS `value`)
scala> df2.select(merged:_*)
res2: org.apache.spark.sql.DataFrame = [value: int]
scala> spark.conf.set("spark.sql.caseSensitive","false")
// process your dataframe