Spark 将单个数据帧连接到数据帧集合
Spark Join Single Dataframe to a Collection of Dataframes
我正在努力寻找一个优雅的解决方案,将单个数据帧连接到 1 到 N 个相关数据帧的单独序列。初始尝试:
val sources = program.attributes.map(attr => {
spark.read
.option("header", value = true)
.schema(program.GetSchema(attr))
.csv(s"${program.programRawHdfsDirectory}/${attr.sourceFile}")
})
val rawDf: DataFrame = sources.reduce((df1, df2) => df1.join(df2, program.dimensionFields, "full"))
// Full of fail:
val fullDf: DataFrame = program.dimensions.filter(d => d.hierarchy != "RAW").reduceLeft((d1, _) => {
val hierarchy = spark.read.parquet(d1.hierarchyLocation).where(d1.hierarchyFilter)
rawDf.join(hierarchy, d1.hierarchyJoin)
})
fullDf.selectExpr(program.outputFields:_*).write.parquet(program.programEtlHdfsDirectory)
reduceLeft 的想法行不通,因为我正在遍历一组配置对象(维度 属性),但我希望从每次迭代中返回的是一个数据框。错误是类型不匹配,这并不奇怪。
问题的核心是我有 1 到 N "dimension" 个对象,它们定义了如何加载现有层次结构 table 以及如何将 table 加入我的 "raw" 我之前创建的数据框。
知道如何在没有某种可怕的破解的情况下创建这些连接吗?
更新:
我想知道这是否可行?我要加入的每个层次结构数据框中都有一个公共字段名称。如果我重命名此公共字段以匹配 "raw" 数据框中的相应列,我是否可以在不显式调出列的情况下执行折叠连接? Spark 会默认使用匹配的名称吗?
val rawDf = sources.reduce((df1, df2) => df1.join(df2, program.dimensionFields, "full"))
val hierarchies = program.dimensions.map(dim => {
spark.read.parquet(dim.hierarchyLocation).where(dim.hierarchyFilter).withColumnRenamed("parent_hier_cd", dim.columnName)
})
val fullDf = hierarchies.foldLeft(rawDf) { (df1, df2) => df1.join(df2) }
更新 2
不,那行不通。 Spark 尝试交叉连接。
出于我的目的,我只需要在生成层次结构集合时 return 一个元组:
val hierarchies = program.dimensions.map(dim => {
val hierarchy = spark.read.parquet(dim.hierarchyLocation).where(dim.hierarchyFilter).alias(dim.hierarchy.toLowerCase)
(dim, hierarchy)
})
然后当我将它们折叠成 rawDf
时,我就有了构建连接所需的元数据。
我正在努力寻找一个优雅的解决方案,将单个数据帧连接到 1 到 N 个相关数据帧的单独序列。初始尝试:
val sources = program.attributes.map(attr => {
spark.read
.option("header", value = true)
.schema(program.GetSchema(attr))
.csv(s"${program.programRawHdfsDirectory}/${attr.sourceFile}")
})
val rawDf: DataFrame = sources.reduce((df1, df2) => df1.join(df2, program.dimensionFields, "full"))
// Full of fail:
val fullDf: DataFrame = program.dimensions.filter(d => d.hierarchy != "RAW").reduceLeft((d1, _) => {
val hierarchy = spark.read.parquet(d1.hierarchyLocation).where(d1.hierarchyFilter)
rawDf.join(hierarchy, d1.hierarchyJoin)
})
fullDf.selectExpr(program.outputFields:_*).write.parquet(program.programEtlHdfsDirectory)
reduceLeft 的想法行不通,因为我正在遍历一组配置对象(维度 属性),但我希望从每次迭代中返回的是一个数据框。错误是类型不匹配,这并不奇怪。
问题的核心是我有 1 到 N "dimension" 个对象,它们定义了如何加载现有层次结构 table 以及如何将 table 加入我的 "raw" 我之前创建的数据框。
知道如何在没有某种可怕的破解的情况下创建这些连接吗?
更新:
我想知道这是否可行?我要加入的每个层次结构数据框中都有一个公共字段名称。如果我重命名此公共字段以匹配 "raw" 数据框中的相应列,我是否可以在不显式调出列的情况下执行折叠连接? Spark 会默认使用匹配的名称吗?
val rawDf = sources.reduce((df1, df2) => df1.join(df2, program.dimensionFields, "full"))
val hierarchies = program.dimensions.map(dim => {
spark.read.parquet(dim.hierarchyLocation).where(dim.hierarchyFilter).withColumnRenamed("parent_hier_cd", dim.columnName)
})
val fullDf = hierarchies.foldLeft(rawDf) { (df1, df2) => df1.join(df2) }
更新 2
不,那行不通。 Spark 尝试交叉连接。
出于我的目的,我只需要在生成层次结构集合时 return 一个元组:
val hierarchies = program.dimensions.map(dim => {
val hierarchy = spark.read.parquet(dim.hierarchyLocation).where(dim.hierarchyFilter).alias(dim.hierarchy.toLowerCase)
(dim, hierarchy)
})
然后当我将它们折叠成 rawDf
时,我就有了构建连接所需的元数据。