结合没有重复的 Spark 模式?
Combining Spark schema without duplicates?
为了处理我拥有的数据,我之前提取了架构,因此当我读取数据集时,我提供了架构,而不是经过昂贵的推断架构步骤。
为了构建模式,我需要将几个不同的模式合并到最终模式中,所以我一直在使用 union (++)
和 distinct
方法,但我不断得到 org.apache.spark.sql.AnalysisException: Duplicate column(s)
异常。
例如,假设我们在以下结构中有两个模式:
val schema1 = StructType(StructField("A", StructType(
StructField("i", StringType, true) :: Nil
), true) :: Nil)
val schema2 = StructType(StructField("A", StructType(
StructField("i", StringType, true) :: Nil
), true) :: Nil)
val schema3 = StructType(StructField("A", StructType(
StructField("i", StringType, true) ::
StructField("ii", StringType, true) :: Nil
), true) :: Nil)
val final_schema = (schema1 ++ schema2 ++ schema3).distinct
println(final_schema)
输出:
StructType(
StructField(A,StructType(
StructField(i,StringType,true)),true),
StructField(A,StructType(
StructField(i,StringType,true),
StructField(ii,StringType,true)),true))
我知道 distinct
只有与另一个模式完全匹配的模式结构才会被过滤掉。但是我希望结果看起来像这样:
StructType(
StructField(A,StructType(
StructField(i,StringType,true),
StructField(ii,StringType,true)),true))
其中所有 "combined" 进入一个模式。我筛选了 scala documentation 中的所有方法,但我似乎找不到解决此问题的正确方法。有什么想法吗?
编辑:
最终目标是将 final_schema
馈入 sqlContext.read.schema
并使用 read
方法读取 JSON 字符串的 RDD。
尝试这样的事情:
(schema1 ++ schema2 ++ schema3).groupBy(getKey).map(_._2.head)
其中 getKey
是一个从架构到您要考虑合并的属性(例如列名称或子字段名称)的函数。在 map
函数中,您可以使用 head 或使用一些更精细的函数来保留特定的模式。
Spark 与 Scala:
val consolidatedSchema = test1Df.schema.++:(test2Df.schema).toSet
val uniqueConsolidatedSchemas = StructType(consolidatedSchema.toSeq)
与 Java 的火花:
StructType consolidatedSchema = test1Df.schema().merge(test2Df.schema());
为了处理我拥有的数据,我之前提取了架构,因此当我读取数据集时,我提供了架构,而不是经过昂贵的推断架构步骤。
为了构建模式,我需要将几个不同的模式合并到最终模式中,所以我一直在使用 union (++)
和 distinct
方法,但我不断得到 org.apache.spark.sql.AnalysisException: Duplicate column(s)
异常。
例如,假设我们在以下结构中有两个模式:
val schema1 = StructType(StructField("A", StructType(
StructField("i", StringType, true) :: Nil
), true) :: Nil)
val schema2 = StructType(StructField("A", StructType(
StructField("i", StringType, true) :: Nil
), true) :: Nil)
val schema3 = StructType(StructField("A", StructType(
StructField("i", StringType, true) ::
StructField("ii", StringType, true) :: Nil
), true) :: Nil)
val final_schema = (schema1 ++ schema2 ++ schema3).distinct
println(final_schema)
输出:
StructType(
StructField(A,StructType(
StructField(i,StringType,true)),true),
StructField(A,StructType(
StructField(i,StringType,true),
StructField(ii,StringType,true)),true))
我知道 distinct
只有与另一个模式完全匹配的模式结构才会被过滤掉。但是我希望结果看起来像这样:
StructType(
StructField(A,StructType(
StructField(i,StringType,true),
StructField(ii,StringType,true)),true))
其中所有 "combined" 进入一个模式。我筛选了 scala documentation 中的所有方法,但我似乎找不到解决此问题的正确方法。有什么想法吗?
编辑:
最终目标是将 final_schema
馈入 sqlContext.read.schema
并使用 read
方法读取 JSON 字符串的 RDD。
尝试这样的事情:
(schema1 ++ schema2 ++ schema3).groupBy(getKey).map(_._2.head)
其中 getKey
是一个从架构到您要考虑合并的属性(例如列名称或子字段名称)的函数。在 map
函数中,您可以使用 head 或使用一些更精细的函数来保留特定的模式。
Spark 与 Scala:
val consolidatedSchema = test1Df.schema.++:(test2Df.schema).toSet
val uniqueConsolidatedSchemas = StructType(consolidatedSchema.toSeq)
与 Java 的火花:
StructType consolidatedSchema = test1Df.schema().merge(test2Df.schema());