如何将 Map[string,Dataframe] 填充为 scala 中 Dataframe 中的列
How to populate Map[string,Dataframe] as a column in a Dataframe in scala
我有一个Map[String, Dataframe]
。我想将该地图中的所有数据合并到一个数据框中。数据框可以有一列地图数据类型吗?
def sample(dfs : Map[String,Dataframe]): Dataframe =
{
.........
}
示例:
DF1
id name age
1 aaa 23
2 bbb 34
DF2
game time score
ludo 10 20
rummy 30 40
我把上面两个DF作为Map传给了函数。
然后将每个数据帧的数据以 json 格式放入输出数据帧的单列中。
出DF
+---------------------------------------------------------------------------------------+
| column1 |
+---------------------------------------------------------------------------------------+
| [{"id":"1","name":"aaa","age":"23"},{"id":21","name":"bbb","age":"24"}] |
| [{"game":"ludo","time":"10","score":"20"},{"game":"rummy","time":"30","score":"40"}] |
+---------------------------------------------------------------------------------------+
这是针对您的用例的解决方案:
import org.apache.spark.sql._
def sample(dfs : Map[String, DataFrame])(implicit spark: SparkSession): DataFrame =
dfs
.values
.foldLeft(spark.emptyDataFrame)((acc, df) => acc.union(df))
需要 spark 会话来创建要折叠的空 DataFrame 累加器。
或者,如果您可以保证 Map
不为空。
def sample(dfs : Map[String, DataFrame]): DataFrame =
dfs
.values
.reduce((acc, df) => acc.union(df))
您要求为每个数据帧生成一行。请注意,如果其中一个数据帧足够大以至于不能包含在一个执行程序中,则此代码将会中断。
让我们首先生成类型为Map[String, DataFrame]
的数据和地图dfs
。
val df1 = Seq((1, "aaa", 23), (2, "bbb", 34)).toDF("id", "name", "age")
val df2 = Seq(("ludo", 10, 20), ("rummy", 10, 40)).toDF("game", "time", "score")
dfs = Seq(df1, df2)
然后,对于地图的每个数据框,我们生成两列。 big_map
将数据框的每个列名与其值相关联(转换为字符串以具有一致的类型)。 df
只包含数据框的名称。然后我们将所有数据帧与 reduce
合并并按 name
分组(这是每个数据帧完全排成一行的部分,因此一个执行者)。
dfs
.toSeq
.map{ case (name, df) => df
.select(map(
df.columns.flatMap(c => Seq(lit(c), col(c).cast("string"))) : _*
) as "big_map")
.withColumn("df", lit(name))}
.reduce(_ union _)
.groupBy("df")
.agg(collect_list('big_map) as "column1")
.show(false)
+---+-----------------------------------------------------------------------------------+
|df |column1 |
+---+-----------------------------------------------------------------------------------+
|df0|[{id -> 1, name -> aaa, age -> 23}, {id -> 2, name -> bbb, age -> 34}] |
|df1|[{game -> ludo, time -> 10, score -> 20}, {game -> rummy, time -> 10, score -> 40}]|
+---+-----------------------------------------------------------------------------------+
我有一个Map[String, Dataframe]
。我想将该地图中的所有数据合并到一个数据框中。数据框可以有一列地图数据类型吗?
def sample(dfs : Map[String,Dataframe]): Dataframe =
{
.........
}
示例:
DF1
id name age
1 aaa 23
2 bbb 34
DF2
game time score
ludo 10 20
rummy 30 40
我把上面两个DF作为Map传给了函数。 然后将每个数据帧的数据以 json 格式放入输出数据帧的单列中。
出DF
+---------------------------------------------------------------------------------------+
| column1 |
+---------------------------------------------------------------------------------------+
| [{"id":"1","name":"aaa","age":"23"},{"id":21","name":"bbb","age":"24"}] |
| [{"game":"ludo","time":"10","score":"20"},{"game":"rummy","time":"30","score":"40"}] |
+---------------------------------------------------------------------------------------+
这是针对您的用例的解决方案:
import org.apache.spark.sql._
def sample(dfs : Map[String, DataFrame])(implicit spark: SparkSession): DataFrame =
dfs
.values
.foldLeft(spark.emptyDataFrame)((acc, df) => acc.union(df))
需要 spark 会话来创建要折叠的空 DataFrame 累加器。
或者,如果您可以保证 Map
不为空。
def sample(dfs : Map[String, DataFrame]): DataFrame =
dfs
.values
.reduce((acc, df) => acc.union(df))
您要求为每个数据帧生成一行。请注意,如果其中一个数据帧足够大以至于不能包含在一个执行程序中,则此代码将会中断。
让我们首先生成类型为Map[String, DataFrame]
的数据和地图dfs
。
val df1 = Seq((1, "aaa", 23), (2, "bbb", 34)).toDF("id", "name", "age")
val df2 = Seq(("ludo", 10, 20), ("rummy", 10, 40)).toDF("game", "time", "score")
dfs = Seq(df1, df2)
然后,对于地图的每个数据框,我们生成两列。 big_map
将数据框的每个列名与其值相关联(转换为字符串以具有一致的类型)。 df
只包含数据框的名称。然后我们将所有数据帧与 reduce
合并并按 name
分组(这是每个数据帧完全排成一行的部分,因此一个执行者)。
dfs
.toSeq
.map{ case (name, df) => df
.select(map(
df.columns.flatMap(c => Seq(lit(c), col(c).cast("string"))) : _*
) as "big_map")
.withColumn("df", lit(name))}
.reduce(_ union _)
.groupBy("df")
.agg(collect_list('big_map) as "column1")
.show(false)
+---+-----------------------------------------------------------------------------------+
|df |column1 |
+---+-----------------------------------------------------------------------------------+
|df0|[{id -> 1, name -> aaa, age -> 23}, {id -> 2, name -> bbb, age -> 34}] |
|df1|[{game -> ludo, time -> 10, score -> 20}, {game -> rummy, time -> 10, score -> 40}]|
+---+-----------------------------------------------------------------------------------+