如何在 Scala 中使用 Spark RDD 删除重复项(更像是基于多个属性的过滤器)?
How to remove duplicates (more like filter based on multiple properties) with Spark RDD in Scala?
作为一项政策,我们不会更新文档,但会使用更新后的值重新创建。当我处理事件时,我只想保留更新的事件,所以我想根据多个值从我的 RDD 中过滤掉项目。例如,假设一个项目是:
{
"name": "Sample",
"someId": "123",
"createdAt": "2016-09-21T02:16:32+00:00"
}
更新时间:
{
"name": "Sample-Updated",
"someId": "123", # This remains the same
"createdAt": "2016-09-21T03:16:32+00:00" # This is greater than the one of above, since the update operation is done after the document is generated
}
我一直在做的是:
items = items.toList.
.sortBy(_.createdAt).reverse
items = items
.groupBy(_.someId)
.map(_._2.head)(breakOut)
但这显然将 RDD 转换为列表;火花结束。我该如何实现?
更新
到目前为止,我已经通过查看评论实现了这一点,但在添加到集合中时运气不佳:
// Is this correct? (1)
val initialSet = sc.parallelize(List[(String, Event)]())
val addToSet = (eventSet: RDD[(String, Event)],
event: Event) => {
// What to do here? (2)
}
// Is this correct? (3)
val mergeSets = (p1: RDD[(String, Event)],
p2: RDD[(String, Event)]) => p1.union(p2)
// resultSet is of type RDD[(String, RDD[(String, Event)])]. How to get it as RDD[(String, Event)]? (4)
val resultSet = initialSet.aggregateByKey(initialSet)(addToSet, mergeSets)
先用map得到一对RDD like (data)->(name,data)
然后使用 aggregateByKey,它将对具有相同键的对执行聚合操作,就像 group by.
您应该可以在此处使用 reduceByKey
:
rdd
.keyBy(_.someId)
.reduceByKey((x, y) => if (x.createdAt > y.createdAt) x else y)
.values
初始 keyBy
创建 (id, object)
,reduceByKey
选择最近的对象,values
删除键。
作为一项政策,我们不会更新文档,但会使用更新后的值重新创建。当我处理事件时,我只想保留更新的事件,所以我想根据多个值从我的 RDD 中过滤掉项目。例如,假设一个项目是:
{
"name": "Sample",
"someId": "123",
"createdAt": "2016-09-21T02:16:32+00:00"
}
更新时间:
{
"name": "Sample-Updated",
"someId": "123", # This remains the same
"createdAt": "2016-09-21T03:16:32+00:00" # This is greater than the one of above, since the update operation is done after the document is generated
}
我一直在做的是:
items = items.toList.
.sortBy(_.createdAt).reverse
items = items
.groupBy(_.someId)
.map(_._2.head)(breakOut)
但这显然将 RDD 转换为列表;火花结束。我该如何实现?
更新
到目前为止,我已经通过查看评论实现了这一点,但在添加到集合中时运气不佳:
// Is this correct? (1)
val initialSet = sc.parallelize(List[(String, Event)]())
val addToSet = (eventSet: RDD[(String, Event)],
event: Event) => {
// What to do here? (2)
}
// Is this correct? (3)
val mergeSets = (p1: RDD[(String, Event)],
p2: RDD[(String, Event)]) => p1.union(p2)
// resultSet is of type RDD[(String, RDD[(String, Event)])]. How to get it as RDD[(String, Event)]? (4)
val resultSet = initialSet.aggregateByKey(initialSet)(addToSet, mergeSets)
先用map得到一对RDD like (data)->(name,data) 然后使用 aggregateByKey,它将对具有相同键的对执行聚合操作,就像 group by.
您应该可以在此处使用 reduceByKey
:
rdd
.keyBy(_.someId)
.reduceByKey((x, y) => if (x.createdAt > y.createdAt) x else y)
.values
初始 keyBy
创建 (id, object)
,reduceByKey
选择最近的对象,values
删除键。