当编码时 Row 架构未知时,如何将字符串与 Row 合并以创建新的 spark 数据框?

How do I merge string with a Row to create a new spark dataframe when Row schema is unknown while coding?

我创建了一个函数,它将一行作为输入并给出一个字符串作为输出。我计划将此功能应用于架构彼此不同的各种数据框。这些数据框很大,每个都有数百万行,但每个数据框都有一个定义的模式

我想创建另一个函数来调用第一个函数,将函数的输出字符串与它发送给函数的行合并,并创建一个新的数据帧,它将作为第二个函数的输出。

这两个函数都将在spark-scala环境中编写。我是 spark-scala 的新手,不太确定如何将这些行组合成一个新的数据框

def returnTranformFunctionOutput(inputDataRow: Row, TransformFrame: Array[Row]): String = {
 val resultString = "testdata"
    resultString
  }

  def returnOutputDataframe(inputDataframe: DataFrame, TranformFrame: Array[Row]): DataFrame = {

    val inputSchema = inputDataframe.schema
    val outputSchema =  StructType(StructField("outputVal", StringType, true) :: Nil)
    val final_schema = StructType((inputSchema ++ outputSchema))
    val newDf = inputDataframe.map(row => {
      return Row.merge(row,TransformFunctions.returnTranformFunctionOutput(row,TranformFrame))
    }),final_schema)
    newDf
  }

returnOutputDataframe 无法编译,在执行 Row.merge 时会出现多个错误,包括 no implicits found for parameter evidence: Encoder[U_]type mismatch: Required:Row Found:string

是否可以合并一个字符串和一行以创建一个新行,然后将其合并到一个新的数据框中?

您正尝试在 returnOutputDataframe 中 return Dataframe,但是 .map 步骤将生成 Dataset 并且您也在传递架构而不是编码器。您可以将 inputDataframe 转换为 RDD[Row],映射值,然后使用具有新模式的 spark.createDataFrame 创建 DF。请参阅下面的示例。

  val row1 = RowFactory.create("1","2")
  val schema1 = new StructType()
    .add("c0","string")
    .add("c1","string")

  val row2 = RowFactory.create("A","B")
  val schema2 = new StructType()
    .add("c2","string")
    .add("c3","string")


  val df1 = spark.createDataFrame(sc.parallelize(Seq(row1)),schema1)
  df1.show()

  val rdd = df1.rdd.map(s => Row.merge(s, row2))
  val schema = StructType(schema1 ++ schema2)

  val df = spark.createDataFrame(rdd,schema)
  df.printSchema()
  df.show()

    +---+---+
    | c0| c1|
    +---+---+
    |  1|  2|
    +---+---+

    root
     |-- c0: string (nullable = true)
     |-- c1: string (nullable = true)
     |-- c2: string (nullable = true)
     |-- c3: string (nullable = true)

    +---+---+---+---+
    | c0| c1| c2| c3|
    +---+---+---+---+
    |  1|  2|  A|  B|
    +---+---+---+---+

根据上面 chlebek 的回答,我的最终函数是:

def returnOutputDataframe( inputDataframe: DataFrame, TranformFrame: Broadcast[Array[Row]]): DataFrame = {
val inputSchema = inputDataframe.schema
val outputSchema =  StructType(StructField("outputval", StringType, true) :: Nil)
val final_schema = StructType((inputSchema ++ outputSchema))
val schemaEncoder = RowEncoder(final_schema)
val outputDf = inputDataframe.map(row =>
  Row.merge(row,RowFactory.create(returnTranformFunctionOutputString(row, TranformFrame))))(schemaEncoder)
outputDf
   }
}

在我的测试中,使用 inputDataframe.map 似乎比 inputDataframe.rdd.map 更快,而且它避免了必须使用 createDataFrame 步骤。