从 Scala 中的 RDD Iterable 中提取的最佳方法

Best way to extract from an RDD Iterable in Scala

如果我有以下函数并且我想将 Iterable 拆分为 Var1 的 RDD 和 Var2 的数组,最好的方法是什么?

这是我的函数:

def foo(rdds: RDD[(ID, Iterable[(Var1,Var2)])]) : RDD[(Var1,Array[Var2])] = {
   rdds.map(x => (x._2.map(it => it._1).asInstanceOf[Var1], (x._2.map(it => it._2).toArray)))
}

这是我的示例输入数据:

//RDD[(ID, Iterable[(Var1,Var2)...])]
RDD[("ID1",Iterable[(1,4),(1,8),(1,15)])],
RDD[("ID2",Iterable[(2,18),(2,29)])]

我希望输出如下所示:

//RDD[(Var1,Array[Var2])]
RDD[1,(4,8,15)],
RDD[2,(18,29)]

虽然我上面的代码有效。似乎没有必要遍历 x._2 两次来获取 Iterable 的两个部分,而且我不喜欢必须显式执行 asInstanceOf[Var1] 转换以更改 Iterable 类型的方式。

有没有更好的方法从 Iterable 中提取元素并将它们放入按 Var1 分组的新 RDD 中?

你可以改变你获得的方式var1。根据您显示的数据,Iterable 中的 var1 值相同。所以不需要遍历 Iterable 两次。你可以这样做:

def foo(rdds: RDD[(ID, Iterable[(Var1,Var2)])]) : RDD[(Var1,Array[Var2])] = {
   rdds.map(x => (x._2.head._1, (x._2.map(it => it._2).toArray)))
}

这将给出所需的结果。

注意 - 这仅在 x._2 至少存在一个元素时有效。如果 x._2 有可能为空,那么 x._2.headOption.getOrElse((defaultVar1, defaultVar2))._1 可能是个好主意。