合并 2 个键值分组数据集
combime 2 KeyValueGroupedDatasets
我有 2 个按键分组的数据集
val stgDS = Seq(("1", "1"), ("1", "2"), ("1", "3"), ("1", "4"), ("1", "5"), ("2", "1"), ("2", "2"), ("2", "3"), ("2", "4"), ("2", "5"))
.toDF("number", "time")
.as[Stg]
val aggDS = Seq(("1", "1"), ("1", "4"), ("1", "8"), ("2", "2"), ("2", "5"))
.toDF("number", "time")
.as[Agg]
之后我可以像这样对每个值应用一个函数
stgDS.groupByKey(_.number)
.flatMapGroups{case(k, iterator) => somefunction(iterator)}
我该如何合并
stgDS.groupByKey(_.number)
aggDS.groupByKey(_.number)
得到类似
的东西
(k, (iteratorStg, iteratorAgg))
然后执行
.flatMapGroups{case(k, (iteratorStg, iteratorAgg)) => somefunction(iteratorStg, iteratorAgg)}
我正在查看 combineByKey 功能,但要么它只是分组的另一种变体,要么我不明白它是如何工作的。
简单的连接是行不通的,因为我想分别循环遍历这些迭代器。
两个KeyValueGroupedDataset can be combined with cogroup.
来自合作组文档:
Applies the given function to each cogrouped data. For each unique group, the function will be passed the grouping key and 2 iterators containing all elements in the group from Dataset this and other.
代码
val stgGroupedDS = stgDS.groupByKey(_.number)
val aggGroupedDS = aggDS.groupByKey(_.number)
stgGroupedDS.cogroup(aggGroupedDS)(
//run whatever logic is required and then return an iterator
(key:String, it1:Iterator[Stg], it2:Iterator[Avg])
=> Seq((key, it1.toList.mkString(",") + "//" + it2.toList.mkString(",")))
.iterator
)
.show(false)
打印
+---+------------------------------------------------------------------------+
|_1 |_2 |
+---+------------------------------------------------------------------------+
|1 |Stg(1,1),Stg(1,2),Stg(1,3),Stg(1,4),Stg(1,5)//Avg(1,1),Avg(1,4),Avg(1,8)|
|2 |Stg(2,1),Stg(2,2),Stg(2,3),Stg(2,4),Stg(2,5)//Avg(2,2),Avg(2,5) |
+---+------------------------------------------------------------------------+
我有 2 个按键分组的数据集
val stgDS = Seq(("1", "1"), ("1", "2"), ("1", "3"), ("1", "4"), ("1", "5"), ("2", "1"), ("2", "2"), ("2", "3"), ("2", "4"), ("2", "5"))
.toDF("number", "time")
.as[Stg]
val aggDS = Seq(("1", "1"), ("1", "4"), ("1", "8"), ("2", "2"), ("2", "5"))
.toDF("number", "time")
.as[Agg]
之后我可以像这样对每个值应用一个函数
stgDS.groupByKey(_.number)
.flatMapGroups{case(k, iterator) => somefunction(iterator)}
我该如何合并
stgDS.groupByKey(_.number)
aggDS.groupByKey(_.number)
得到类似
的东西(k, (iteratorStg, iteratorAgg))
然后执行
.flatMapGroups{case(k, (iteratorStg, iteratorAgg)) => somefunction(iteratorStg, iteratorAgg)}
我正在查看 combineByKey 功能,但要么它只是分组的另一种变体,要么我不明白它是如何工作的。
简单的连接是行不通的,因为我想分别循环遍历这些迭代器。
两个KeyValueGroupedDataset can be combined with cogroup.
来自合作组文档:
Applies the given function to each cogrouped data. For each unique group, the function will be passed the grouping key and 2 iterators containing all elements in the group from Dataset this and other.
代码
val stgGroupedDS = stgDS.groupByKey(_.number)
val aggGroupedDS = aggDS.groupByKey(_.number)
stgGroupedDS.cogroup(aggGroupedDS)(
//run whatever logic is required and then return an iterator
(key:String, it1:Iterator[Stg], it2:Iterator[Avg])
=> Seq((key, it1.toList.mkString(",") + "//" + it2.toList.mkString(",")))
.iterator
)
.show(false)
打印
+---+------------------------------------------------------------------------+
|_1 |_2 |
+---+------------------------------------------------------------------------+
|1 |Stg(1,1),Stg(1,2),Stg(1,3),Stg(1,4),Stg(1,5)//Avg(1,1),Avg(1,4),Avg(1,8)|
|2 |Stg(2,1),Stg(2,2),Stg(2,3),Stg(2,4),Stg(2,5)//Avg(2,2),Avg(2,5) |
+---+------------------------------------------------------------------------+