合并 2 个键值分组数据集

combime 2 KeyValueGroupedDatasets

我有 2 个按键分组的数据集

val stgDS = Seq(("1", "1"), ("1", "2"), ("1", "3"), ("1", "4"), ("1", "5"), ("2", "1"), ("2", "2"), ("2", "3"), ("2", "4"), ("2", "5"))
      .toDF("number", "time")
      .as[Stg]

val aggDS = Seq(("1", "1"), ("1", "4"), ("1", "8"), ("2", "2"), ("2", "5"))
  .toDF("number", "time")
  .as[Agg]

之后我可以像这样对每个值应用一个函数

stgDS.groupByKey(_.number)
  .flatMapGroups{case(k, iterator) => somefunction(iterator)}

我该如何合并

stgDS.groupByKey(_.number)
aggDS.groupByKey(_.number)

得到类似

的东西
(k, (iteratorStg, iteratorAgg))

然后执行

.flatMapGroups{case(k, (iteratorStg, iteratorAgg)) => somefunction(iteratorStg, iteratorAgg)}

我正在查看 combineByKey 功能,但要么它只是分组的另一种变体,要么我不明白它是如何工作的。

简单的连接是行不通的,因为我想分别循环遍历这些迭代器。

两个KeyValueGroupedDataset can be combined with cogroup.

来自合作组文档:

Applies the given function to each cogrouped data. For each unique group, the function will be passed the grouping key and 2 iterators containing all elements in the group from Dataset this and other.

代码

val stgGroupedDS = stgDS.groupByKey(_.number)
val aggGroupedDS = aggDS.groupByKey(_.number)

stgGroupedDS.cogroup(aggGroupedDS)(
    //run whatever logic is required and then return an iterator
    (key:String, it1:Iterator[Stg], it2:Iterator[Avg])  
        => Seq((key, it1.toList.mkString(",") + "//" + it2.toList.mkString(",")))
              .iterator
)
.show(false)

打印

+---+------------------------------------------------------------------------+
|_1 |_2                                                                      |
+---+------------------------------------------------------------------------+
|1  |Stg(1,1),Stg(1,2),Stg(1,3),Stg(1,4),Stg(1,5)//Avg(1,1),Avg(1,4),Avg(1,8)|
|2  |Stg(2,1),Stg(2,2),Stg(2,3),Stg(2,4),Stg(2,5)//Avg(2,2),Avg(2,5)         |
+---+------------------------------------------------------------------------+