当编码时 Row 架构未知时,如何将字符串与 Row 合并以创建新的 spark 数据框?
How do I merge string with a Row to create a new spark dataframe when Row schema is unknown while coding?
我创建了一个函数,它将一行作为输入并给出一个字符串作为输出。我计划将此功能应用于架构彼此不同的各种数据框。这些数据框很大,每个都有数百万行,但每个数据框都有一个定义的模式
我想创建另一个函数来调用第一个函数,将函数的输出字符串与它发送给函数的行合并,并创建一个新的数据帧,它将作为第二个函数的输出。
这两个函数都将在spark-scala环境中编写。我是 spark-scala 的新手,不太确定如何将这些行组合成一个新的数据框
def returnTranformFunctionOutput(inputDataRow: Row, TransformFrame: Array[Row]): String = {
val resultString = "testdata"
resultString
}
def returnOutputDataframe(inputDataframe: DataFrame, TranformFrame: Array[Row]): DataFrame = {
val inputSchema = inputDataframe.schema
val outputSchema = StructType(StructField("outputVal", StringType, true) :: Nil)
val final_schema = StructType((inputSchema ++ outputSchema))
val newDf = inputDataframe.map(row => {
return Row.merge(row,TransformFunctions.returnTranformFunctionOutput(row,TranformFrame))
}),final_schema)
newDf
}
returnOutputDataframe
无法编译,在执行 Row.merge 时会出现多个错误,包括 no implicits found for parameter evidence: Encoder[U_]
和 type mismatch: Required:Row Found:string
。
是否可以合并一个字符串和一行以创建一个新行,然后将其合并到一个新的数据框中?
您正尝试在 returnOutputDataframe
中 return Dataframe
,但是 .map 步骤将生成 Dataset
并且您也在传递架构而不是编码器。您可以将 inputDataframe
转换为 RDD[Row]
,映射值,然后使用具有新模式的 spark.createDataFrame 创建 DF。请参阅下面的示例。
val row1 = RowFactory.create("1","2")
val schema1 = new StructType()
.add("c0","string")
.add("c1","string")
val row2 = RowFactory.create("A","B")
val schema2 = new StructType()
.add("c2","string")
.add("c3","string")
val df1 = spark.createDataFrame(sc.parallelize(Seq(row1)),schema1)
df1.show()
val rdd = df1.rdd.map(s => Row.merge(s, row2))
val schema = StructType(schema1 ++ schema2)
val df = spark.createDataFrame(rdd,schema)
df.printSchema()
df.show()
+---+---+
| c0| c1|
+---+---+
| 1| 2|
+---+---+
root
|-- c0: string (nullable = true)
|-- c1: string (nullable = true)
|-- c2: string (nullable = true)
|-- c3: string (nullable = true)
+---+---+---+---+
| c0| c1| c2| c3|
+---+---+---+---+
| 1| 2| A| B|
+---+---+---+---+
根据上面 chlebek 的回答,我的最终函数是:
def returnOutputDataframe( inputDataframe: DataFrame, TranformFrame: Broadcast[Array[Row]]): DataFrame = {
val inputSchema = inputDataframe.schema
val outputSchema = StructType(StructField("outputval", StringType, true) :: Nil)
val final_schema = StructType((inputSchema ++ outputSchema))
val schemaEncoder = RowEncoder(final_schema)
val outputDf = inputDataframe.map(row =>
Row.merge(row,RowFactory.create(returnTranformFunctionOutputString(row, TranformFrame))))(schemaEncoder)
outputDf
}
}
在我的测试中,使用 inputDataframe.map
似乎比 inputDataframe.rdd.map
更快,而且它避免了必须使用 createDataFrame
步骤。
我创建了一个函数,它将一行作为输入并给出一个字符串作为输出。我计划将此功能应用于架构彼此不同的各种数据框。这些数据框很大,每个都有数百万行,但每个数据框都有一个定义的模式
我想创建另一个函数来调用第一个函数,将函数的输出字符串与它发送给函数的行合并,并创建一个新的数据帧,它将作为第二个函数的输出。
这两个函数都将在spark-scala环境中编写。我是 spark-scala 的新手,不太确定如何将这些行组合成一个新的数据框
def returnTranformFunctionOutput(inputDataRow: Row, TransformFrame: Array[Row]): String = {
val resultString = "testdata"
resultString
}
def returnOutputDataframe(inputDataframe: DataFrame, TranformFrame: Array[Row]): DataFrame = {
val inputSchema = inputDataframe.schema
val outputSchema = StructType(StructField("outputVal", StringType, true) :: Nil)
val final_schema = StructType((inputSchema ++ outputSchema))
val newDf = inputDataframe.map(row => {
return Row.merge(row,TransformFunctions.returnTranformFunctionOutput(row,TranformFrame))
}),final_schema)
newDf
}
returnOutputDataframe
无法编译,在执行 Row.merge 时会出现多个错误,包括 no implicits found for parameter evidence: Encoder[U_]
和 type mismatch: Required:Row Found:string
。
是否可以合并一个字符串和一行以创建一个新行,然后将其合并到一个新的数据框中?
您正尝试在 returnOutputDataframe
中 return Dataframe
,但是 .map 步骤将生成 Dataset
并且您也在传递架构而不是编码器。您可以将 inputDataframe
转换为 RDD[Row]
,映射值,然后使用具有新模式的 spark.createDataFrame 创建 DF。请参阅下面的示例。
val row1 = RowFactory.create("1","2")
val schema1 = new StructType()
.add("c0","string")
.add("c1","string")
val row2 = RowFactory.create("A","B")
val schema2 = new StructType()
.add("c2","string")
.add("c3","string")
val df1 = spark.createDataFrame(sc.parallelize(Seq(row1)),schema1)
df1.show()
val rdd = df1.rdd.map(s => Row.merge(s, row2))
val schema = StructType(schema1 ++ schema2)
val df = spark.createDataFrame(rdd,schema)
df.printSchema()
df.show()
+---+---+
| c0| c1|
+---+---+
| 1| 2|
+---+---+
root
|-- c0: string (nullable = true)
|-- c1: string (nullable = true)
|-- c2: string (nullable = true)
|-- c3: string (nullable = true)
+---+---+---+---+
| c0| c1| c2| c3|
+---+---+---+---+
| 1| 2| A| B|
+---+---+---+---+
根据上面 chlebek 的回答,我的最终函数是:
def returnOutputDataframe( inputDataframe: DataFrame, TranformFrame: Broadcast[Array[Row]]): DataFrame = {
val inputSchema = inputDataframe.schema
val outputSchema = StructType(StructField("outputval", StringType, true) :: Nil)
val final_schema = StructType((inputSchema ++ outputSchema))
val schemaEncoder = RowEncoder(final_schema)
val outputDf = inputDataframe.map(row =>
Row.merge(row,RowFactory.create(returnTranformFunctionOutputString(row, TranformFrame))))(schemaEncoder)
outputDf
}
}
在我的测试中,使用 inputDataframe.map
似乎比 inputDataframe.rdd.map
更快,而且它避免了必须使用 createDataFrame
步骤。