按时间戳将 RDD 条目分组到覆盖 X 分钟的条目列表中
Group RDD entries by timestamps into the lists of entries covering X minutes
我在 RDD 中有以下条目:
(111,List(List(1473163148,abc)))
(111,List(List(1473163143,def)))
(111,List(List(1473163143,abd)))
(111,List(List(1473163139,asd)))
(111,List(List(1473163696,rtf)))
(111,List(List(1473163700,rgd)))
(111,List(List(1473163703,dmf)))
我想将这些条目分组为新条目,以便每个新条目在 30 分钟内包含旧条目列表。这似乎很简单,但在实践中我的代码并没有做到这一点。
val grouped = processed.reduceByKey((x,y) => x ++ y)
val separated = grouped.flatMap { case (k, l) => MyFuncObj.createGroups(l).map(sublist => (k, sublist)) }
object MyFuncObj {
def createGroups(l: List[List[Any]]): Iterable[List[List[Any]]] = {
l.groupBy(_.productElement(0).toString.toLong / 30*60).values
}
}
将此代码应用于上述数据后,我得到以下结果(我只提供时间戳,因为这是关键点):
1473163143 1473163143 1473163148
1473163139
1473163696 1473163700 1473163703
1473168932
由于这些时间戳是秒,因此应按如下方式分组:
1473163143 1473163143 1473163148 1473163139 1473163696 1473163700 1473163703
1473168932
如何解决这个任务?
更新:
更明确地说:我希望从第一个记录开始时获得 30 分钟的存储桶。
这里有两个问题:
如果您希望 "buckets" 在第一个条目的时间开始 - 您应该在每个时间戳和第一个时间戳之间使用 delta进行除法之前的时间戳
30*60
周围缺少括号 - 您除以 30,然后 然后 将该结果乘以 60,而不是除以 (30*60)
:
scala> 5000 / 30*60
res0: Int = 9960
scala> 5000 / (30*60)
res1: Int = 2
总而言之 - 这似乎可以满足您的需求:
// sample data:
val processed = sc.parallelize(List(
(111,List(List(1473163148L, "abc"))),
(111,List(List(1473163143L,"def"))),
(111,List(List(1473163143L,"abd"))),
(111,List(List(1473163139L,"asd"))),
(111,List(List(1473163696L,"rtf"))),
(111,List(List(1473163700L,"rgd"))),
(111,List(List(1473168932L,"dmf"))))
)
// first - find the lowest timsestamp:
// if input isn't ordered:
val firstTimestamp: Long = processed.values.map { case List((l: Long) :: _) => l }.min()
// if input is sorted by timestamp:
val firstTimestamp: Long = processed.first()._2.head.head.toString.toLong
def createGroups(l: List[List[Any]]): Iterable[List[List[Any]]] = {
// divide the DELTA between each timestamp and first one by 30 minutes to find bucket:
l.groupBy(t => (firstTimestamp - t.productElement(0).toString.toLong) / (30*60)).values
}
// continue as you did:
val grouped: RDD[(Int, List[List[Any]])] = processed.reduceByKey((x, y) => x ++ y)
val separated: RDD[(Int, List[List[Any]])] = grouped.flatMap {
case (k, l) => createGroups(l).map(sublist => (k, sublist))
}
separated.foreach(println)
// prints:
// (111,List(List(1473168932, dmf)))
// (111,List(List(1473163148, abc), List(1473163143, def), List(1473163143, abd), List(1473163139, asd), List(1473163696, rtf), List(1473163700, rgd)))
我在 RDD 中有以下条目:
(111,List(List(1473163148,abc)))
(111,List(List(1473163143,def)))
(111,List(List(1473163143,abd)))
(111,List(List(1473163139,asd)))
(111,List(List(1473163696,rtf)))
(111,List(List(1473163700,rgd)))
(111,List(List(1473163703,dmf)))
我想将这些条目分组为新条目,以便每个新条目在 30 分钟内包含旧条目列表。这似乎很简单,但在实践中我的代码并没有做到这一点。
val grouped = processed.reduceByKey((x,y) => x ++ y)
val separated = grouped.flatMap { case (k, l) => MyFuncObj.createGroups(l).map(sublist => (k, sublist)) }
object MyFuncObj {
def createGroups(l: List[List[Any]]): Iterable[List[List[Any]]] = {
l.groupBy(_.productElement(0).toString.toLong / 30*60).values
}
}
将此代码应用于上述数据后,我得到以下结果(我只提供时间戳,因为这是关键点):
1473163143 1473163143 1473163148
1473163139
1473163696 1473163700 1473163703
1473168932
由于这些时间戳是秒,因此应按如下方式分组:
1473163143 1473163143 1473163148 1473163139 1473163696 1473163700 1473163703
1473168932
如何解决这个任务?
更新:
更明确地说:我希望从第一个记录开始时获得 30 分钟的存储桶。
这里有两个问题:
如果您希望 "buckets" 在第一个条目的时间开始 - 您应该在每个时间戳和第一个时间戳之间使用 delta进行除法之前的时间戳
30*60
周围缺少括号 - 您除以 30,然后 然后 将该结果乘以 60,而不是除以(30*60)
:scala> 5000 / 30*60 res0: Int = 9960 scala> 5000 / (30*60) res1: Int = 2
总而言之 - 这似乎可以满足您的需求:
// sample data:
val processed = sc.parallelize(List(
(111,List(List(1473163148L, "abc"))),
(111,List(List(1473163143L,"def"))),
(111,List(List(1473163143L,"abd"))),
(111,List(List(1473163139L,"asd"))),
(111,List(List(1473163696L,"rtf"))),
(111,List(List(1473163700L,"rgd"))),
(111,List(List(1473168932L,"dmf"))))
)
// first - find the lowest timsestamp:
// if input isn't ordered:
val firstTimestamp: Long = processed.values.map { case List((l: Long) :: _) => l }.min()
// if input is sorted by timestamp:
val firstTimestamp: Long = processed.first()._2.head.head.toString.toLong
def createGroups(l: List[List[Any]]): Iterable[List[List[Any]]] = {
// divide the DELTA between each timestamp and first one by 30 minutes to find bucket:
l.groupBy(t => (firstTimestamp - t.productElement(0).toString.toLong) / (30*60)).values
}
// continue as you did:
val grouped: RDD[(Int, List[List[Any]])] = processed.reduceByKey((x, y) => x ++ y)
val separated: RDD[(Int, List[List[Any]])] = grouped.flatMap {
case (k, l) => createGroups(l).map(sublist => (k, sublist))
}
separated.foreach(println)
// prints:
// (111,List(List(1473168932, dmf)))
// (111,List(List(1473163148, abc), List(1473163143, def), List(1473163143, abd), List(1473163139, asd), List(1473163696, rtf), List(1473163700, rgd)))