Spark 将单个数据帧连接到数据帧集合

Spark Join Single Dataframe to a Collection of Dataframes

我正在努力寻找一个优雅的解决方案,将单个数据帧连接到 1 到 N 个相关数据帧的单独序列。初始尝试:

  val sources = program.attributes.map(attr => {
    spark.read
      .option("header", value = true)
      .schema(program.GetSchema(attr))
      .csv(s"${program.programRawHdfsDirectory}/${attr.sourceFile}")
  })
  val rawDf: DataFrame = sources.reduce((df1, df2) => df1.join(df2, program.dimensionFields, "full"))

  // Full of fail:
  val fullDf: DataFrame = program.dimensions.filter(d => d.hierarchy != "RAW").reduceLeft((d1, _) => {
    val hierarchy = spark.read.parquet(d1.hierarchyLocation).where(d1.hierarchyFilter)
    rawDf.join(hierarchy, d1.hierarchyJoin)
  })

  fullDf.selectExpr(program.outputFields:_*).write.parquet(program.programEtlHdfsDirectory)

reduceLeft 的想法行不通,因为我正在遍历一组配置对象(维度 属性),但我希望从每次迭代中返回的是一个数据框。错误是类型不匹配,这并不奇怪。

问题的核心是我有 1 到 N "dimension" 个对象,它们定义了如何加载现有层次结构 table 以及如何将 table 加入我的 "raw" 我之前创建的数据框。

知道如何在没有某种可怕的破解的情况下创建这些连接吗?

更新:

我想知道这是否可行?我要加入的每个层次结构数据框中都有一个公共字段名称。如果我重命名此公共字段以匹配 "raw" 数据框中的相应列,我是否可以在不显式调出列的情况下执行折叠连接? Spark 会默认使用匹配的名称吗?

  val rawDf = sources.reduce((df1, df2) => df1.join(df2, program.dimensionFields, "full"))

  val hierarchies = program.dimensions.map(dim => {
    spark.read.parquet(dim.hierarchyLocation).where(dim.hierarchyFilter).withColumnRenamed("parent_hier_cd", dim.columnName)
  })
  val fullDf = hierarchies.foldLeft(rawDf) { (df1, df2) => df1.join(df2) }

更新 2

不,那行不通。 Spark 尝试交叉连接。

出于我的目的,我只需要在生成层次结构集合时 return 一个元组:

  val hierarchies = program.dimensions.map(dim => {
    val hierarchy = spark.read.parquet(dim.hierarchyLocation).where(dim.hierarchyFilter).alias(dim.hierarchy.toLowerCase)
    (dim, hierarchy)
  })

然后当我将它们折叠成 rawDf 时,我就有了构建连接所需的元数据。