如何在 Scala 中使用 Spark RDD 删除重复项(更像是基于多个属性的过滤器)?

How to remove duplicates (more like filter based on multiple properties) with Spark RDD in Scala?

作为一项政策,我们不会更新文档,但会使用更新后的值重新创建。当我处理事件时,我只想保留更新的事件,所以我想根据多个值从我的 RDD 中过滤掉项目。例如,假设一个项目是:

{
    "name": "Sample",
    "someId": "123",
    "createdAt": "2016-09-21T02:16:32+00:00"
}

更新时间:

{
    "name": "Sample-Updated",
    "someId": "123", # This remains the same
    "createdAt": "2016-09-21T03:16:32+00:00" # This is greater than the one of above, since the update operation is done after the document is generated
}

我一直在做的是:

items = items.toList.
      .sortBy(_.createdAt).reverse

    items = items
      .groupBy(_.someId)
      .map(_._2.head)(breakOut)

但这显然将 RDD 转换为列表;火花结束。我该如何实现?

更新

到目前为止,我已经通过查看评论实现了这一点,但在添加到集合中时运气不佳:

// Is this correct? (1)
val initialSet = sc.parallelize(List[(String, Event)]())

val addToSet = (eventSet: RDD[(String, Event)],
                event: Event) => {
    // What to do here? (2)
}

// Is this correct? (3)
val mergeSets = (p1: RDD[(String, Event)],
                 p2: RDD[(String, Event)]) => p1.union(p2)

// resultSet is of type RDD[(String, RDD[(String, Event)])]. How to get it as RDD[(String, Event)]? (4)
val resultSet = initialSet.aggregateByKey(initialSet)(addToSet, mergeSets)

先用map得到一对RDD like (data)->(name,data) 然后使用 aggregateByKey,它将对具有相同键的对执行聚合操作,就像 group by.

您应该可以在此处使用 reduceByKey

rdd
  .keyBy(_.someId)
  .reduceByKey((x, y) => if (x.createdAt > y.createdAt) x else y)
  .values

初始 keyBy 创建 (id, object)reduceByKey 选择最近的对象,values 删除键。