按时间戳将 RDD 条目分组到覆盖 X 分钟的条目列表中

Group RDD entries by timestamps into the lists of entries covering X minutes

我在 RDD 中有以下条目:

(111,List(List(1473163148,abc)))
(111,List(List(1473163143,def)))
(111,List(List(1473163143,abd)))
(111,List(List(1473163139,asd)))
(111,List(List(1473163696,rtf)))
(111,List(List(1473163700,rgd)))
(111,List(List(1473163703,dmf)))    

我想将这些条目分组为新条目,以便每个新条目在 30 分钟内包含旧条目列表。这似乎很简单,但在实践中我的代码并没有做到这一点。

val grouped = processed.reduceByKey((x,y) => x ++ y)
val separated = grouped.flatMap { case (k, l) => MyFuncObj.createGroups(l).map(sublist => (k, sublist)) }

object MyFuncObj {

  def createGroups(l: List[List[Any]]): Iterable[List[List[Any]]] = {
    l.groupBy(_.productElement(0).toString.toLong / 30*60).values
  }

}

将此代码应用于上述数据后,我得到以下结果(我只提供时间戳,因为这是关键点):

1473163143  1473163143  1473163148      
1473163139              
1473163696  1473163700  1473163703      
1473168932  

由于这些时间戳是秒,因此应按如下方式分组:

1473163143  1473163143  1473163148  1473163139  1473163696  1473163700  1473163703
1473168932

如何解决这个任务?

更新:

更明确地说:我希望从第一个记录开始时获得 30 分钟的存储桶。

这里有两个问题:

  1. 如果您希望 "buckets" 在第一个条目的时间开始 - 您应该在每个时间戳和第一个时间戳之间使用 delta进行除法之前的时间戳

  2. 30*60 周围缺少括号 - 您除以 30,然后 然后 将该结果乘以 60,而不是除以 (30*60):

    scala> 5000 / 30*60
    res0: Int = 9960
    
    scala> 5000 / (30*60)
    res1: Int = 2
    

总而言之 - 这似乎可以满足您的需求:

// sample data:
val processed = sc.parallelize(List(
  (111,List(List(1473163148L, "abc"))),
  (111,List(List(1473163143L,"def"))),
  (111,List(List(1473163143L,"abd"))),
  (111,List(List(1473163139L,"asd"))),
  (111,List(List(1473163696L,"rtf"))),
  (111,List(List(1473163700L,"rgd"))),
  (111,List(List(1473168932L,"dmf"))))
)


// first - find the lowest timsestamp:
// if input isn't ordered:
val firstTimestamp: Long = processed.values.map { case List((l: Long) :: _) => l }.min()

// if input is sorted by timestamp:
val firstTimestamp: Long = processed.first()._2.head.head.toString.toLong

def createGroups(l: List[List[Any]]): Iterable[List[List[Any]]] = {
  // divide the DELTA between each timestamp and first one by 30 minutes to find bucket:
  l.groupBy(t => (firstTimestamp - t.productElement(0).toString.toLong) / (30*60)).values
}

// continue as you did:
val grouped: RDD[(Int, List[List[Any]])] = processed.reduceByKey((x, y) => x ++ y)
val separated: RDD[(Int, List[List[Any]])] = grouped.flatMap {
  case (k, l) => createGroups(l).map(sublist => (k, sublist))
}

separated.foreach(println)
// prints:
// (111,List(List(1473168932, dmf)))
// (111,List(List(1473163148, abc), List(1473163143, def), List(1473163143, abd), List(1473163139, asd), List(1473163696, rtf), List(1473163700, rgd)))