如何拆分 RDD[(String,List[(String,String,String,String)])] 的条目

How to split entries of RDD[(String,List[(String,String,String,String)])]

我需要对具有以下格式的 rdd 进行一些处理:

RDD[(String,List[(String,String,String,String)])]

这是来自 RDD 的示例条目:

(600,List((5,111,1,1), (15,111,1,5), (38,111,2,null))
(700,List((5,111,1,1), (35,111,1,5), (39,111,2,null))

我需要根据在列表中每个元组的第一个元素中找到的时间戳值将每个条目拆分为多个条目。每个条目应包含 20 分钟间隔内的时间戳。

例如,第一个条目应拆分为 2 个条目:

List((5,111,1,1), (15,111,1,5))
List((38,111,2,null))

最后的结果应该是RDD[(String,List[(String,String,String,String)])]:

(600,List((5,111,1,1), (15,111,1,5)))
(600,List((38,111,2,null)))
(700,List((5,111,1,1))
(700,List((35,111,1,5), (39,111,2,null))

任何提示如何执行此操作以及应用哪些功能?

您可以创建一个 splitList 函数,根据您想要的行为从给定记录中拆分列表(不确定我是否准确地遵循了它,描述有点含糊),然后使用 flatMap到"split"每条key-value记录分成几条记录:

def doStuff() = {
  val input: RDD[(String,List[(String,String,String,String)])] = sc.parallelize(Seq(
    ("600",List(("5","111","1","1"), ("15","111","1","5"), ("38","111","2",null))),
    ("700",List(("5","111","1","1"), ("35","111","1","5"), ("39","111","2",null)))
  ))

  def splitList(l: List[(String,String,String,String)]): Iterable[List[(String,String,String,String)]] = {
    l.groupBy(_._1.toInt / 20).values // or any other logic
  }

  val result = input.flatMap { case (k, l) => splitList(l).map(sublist => (k, sublist)) }

  result.foreach(println)
  // prints: 
  // (600,List((38,111,2,null)))
  // (600,List((5,111,1,1), (15,111,1,5)))
  // (700,List((35,111,1,5), (39,111,2,null)))
  // (700,List((5,111,1,1)))
}