Spark 最佳方式 groupByKey、orderBy 和 filter
Spark Best way groupByKey, orderBy and filter
我有 50GB 的数据使用此架构 [ID、时间戳、国家/地区 ID],我想使用 spark 2.2.1 获取每个人在他们所有事件中的每个 "change" 按时间戳排序。我的意思是如果我有这个事件:
1,20180101,2
1,20180102,3
1,20180105,3
2,20180105,3
1,20180108,4
1,20180109,3
2,20180108,3
2,20180109,6
我想获得这个:
1,20180101,2
1,20180102,3
1,20180108,4
1,20180109,3
2,20180105,3
2,20180109,6
为此我开发了这段代码:
val eventsOrdened = eventsDataFrame.orderBy("ID", "timestamp")
val grouped = eventsOrdened
.rdd.map(x => (x.getString(0), x))
.groupByKey(300)
.mapValues(y => cleanEvents(y))
.flatMap(_._2)
其中 "cleanEvents" 是:
def cleanEvents(ordenedEvents: Iterable[Row]): Iterable[Row] = {
val ordered = ordenedEvents.toList
val cleanedList: ListBuffer[Row] = ListBuffer.empty[Row]
ordered.map {
x => {
val next = if (ordered.indexOf(x) != ordered.length - 1) ordered(ordered.indexOf(x) + 1) else x
val country = x.get(2)
val nextountry = next.get(2)
val isFirst = if (cleanedList.isEmpty) true else false
val isLast = if (ordered.indexOf(x) == ordered.length - 1) true else false
if (isFirst) {
cleanedList.append(x)
} else {
if (cleanedList.size >= 1 && cleanedList.last.get(2) != country && country != nextCountry) {
cleanedList.append(x)
} else {
if (isLast && cleanedList.last.get(2) != zipCode) cleanedList.append(x)
}
}
}
}
cleanedList
}
可以,但是太慢了,欢迎任何优化!!
谢谢!
您可能想尝试以下方法:
二次排序。它执行 low-level 分区和排序,您将创建一个自定义分区。更多信息在这里:http://codingjunkie.net/spark-secondary-sort/
使用 combineByKey
case class Details(id: Int, date: Int, cc: Int)
val sc = new SparkContext("local[*]", "App")
val list = List[Details](
Details(1,20180101,2),
Details(1,20180102,3),
Details(1,20180105,3),
Details(2,20180105,3),
Details(1,20180108,4),
Details(1,20180109,3),
Details(2,20180108,3),
Details(2,20180109,6))
val rdd = sc.parallelize(list)
val createCombiner = (v: (Int, Int)) => List[(Int, Int)](v)
val combiner = (c: List[(Int, Int)], v: (Int, Int)) => (c :+ v).sortBy(_._1)
val mergeCombiner = (c1: List[(Int, Int)], c2: List[(Int, Int)]) => (c1 ++ c2).sortBy(_._1)
rdd
.map(det => (det.id, (det.date, det.cc)))
.combineByKey(createCombiner, combiner, mergeCombiner)
.collect()
.foreach(println)
输出将是这样的:
(1,List((20180101,2), (20180102,3), (20180105,3), (20180108,4), (20180109,3)))
(2,List((20180105,3), (20180108,3), (20180109,6)))
Window函数"lag"可以使用:
case class Details(id: Int, date: Int, cc: Int)
val list = List[Details](
Details(1, 20180101, 2),
Details(1, 20180102, 3),
Details(1, 20180105, 3),
Details(2, 20180105, 3),
Details(1, 20180108, 4),
Details(1, 20180109, 3),
Details(2, 20180108, 3),
Details(2, 20180109, 6))
val ds = list.toDS()
// action
val window = Window.partitionBy("id").orderBy("date")
val result = ds.withColumn("lag", lag($"cc", 1).over(window)).where(isnull($"lag") || $"lag" =!= $"cc").orderBy("id", "date")
result.show(false)
结果是(可以删除滞后列):
|id |date |cc |lag |
+---+--------+---+----+
|1 |20180101|2 |null|
|1 |20180102|3 |2 |
|1 |20180108|4 |3 |
|1 |20180109|3 |4 |
|2 |20180105|3 |null|
|2 |20180109|6 |3 |
+---+--------+---+----+
我有 50GB 的数据使用此架构 [ID、时间戳、国家/地区 ID],我想使用 spark 2.2.1 获取每个人在他们所有事件中的每个 "change" 按时间戳排序。我的意思是如果我有这个事件:
1,20180101,2
1,20180102,3
1,20180105,3
2,20180105,3
1,20180108,4
1,20180109,3
2,20180108,3
2,20180109,6
我想获得这个:
1,20180101,2
1,20180102,3
1,20180108,4
1,20180109,3
2,20180105,3
2,20180109,6
为此我开发了这段代码:
val eventsOrdened = eventsDataFrame.orderBy("ID", "timestamp")
val grouped = eventsOrdened
.rdd.map(x => (x.getString(0), x))
.groupByKey(300)
.mapValues(y => cleanEvents(y))
.flatMap(_._2)
其中 "cleanEvents" 是:
def cleanEvents(ordenedEvents: Iterable[Row]): Iterable[Row] = {
val ordered = ordenedEvents.toList
val cleanedList: ListBuffer[Row] = ListBuffer.empty[Row]
ordered.map {
x => {
val next = if (ordered.indexOf(x) != ordered.length - 1) ordered(ordered.indexOf(x) + 1) else x
val country = x.get(2)
val nextountry = next.get(2)
val isFirst = if (cleanedList.isEmpty) true else false
val isLast = if (ordered.indexOf(x) == ordered.length - 1) true else false
if (isFirst) {
cleanedList.append(x)
} else {
if (cleanedList.size >= 1 && cleanedList.last.get(2) != country && country != nextCountry) {
cleanedList.append(x)
} else {
if (isLast && cleanedList.last.get(2) != zipCode) cleanedList.append(x)
}
}
}
}
cleanedList
}
可以,但是太慢了,欢迎任何优化!!
谢谢!
您可能想尝试以下方法:
二次排序。它执行 low-level 分区和排序,您将创建一个自定义分区。更多信息在这里:http://codingjunkie.net/spark-secondary-sort/
使用 combineByKey
case class Details(id: Int, date: Int, cc: Int) val sc = new SparkContext("local[*]", "App") val list = List[Details]( Details(1,20180101,2), Details(1,20180102,3), Details(1,20180105,3), Details(2,20180105,3), Details(1,20180108,4), Details(1,20180109,3), Details(2,20180108,3), Details(2,20180109,6)) val rdd = sc.parallelize(list) val createCombiner = (v: (Int, Int)) => List[(Int, Int)](v) val combiner = (c: List[(Int, Int)], v: (Int, Int)) => (c :+ v).sortBy(_._1) val mergeCombiner = (c1: List[(Int, Int)], c2: List[(Int, Int)]) => (c1 ++ c2).sortBy(_._1) rdd .map(det => (det.id, (det.date, det.cc))) .combineByKey(createCombiner, combiner, mergeCombiner) .collect() .foreach(println)
输出将是这样的:
(1,List((20180101,2), (20180102,3), (20180105,3), (20180108,4), (20180109,3)))
(2,List((20180105,3), (20180108,3), (20180109,6)))
Window函数"lag"可以使用:
case class Details(id: Int, date: Int, cc: Int)
val list = List[Details](
Details(1, 20180101, 2),
Details(1, 20180102, 3),
Details(1, 20180105, 3),
Details(2, 20180105, 3),
Details(1, 20180108, 4),
Details(1, 20180109, 3),
Details(2, 20180108, 3),
Details(2, 20180109, 6))
val ds = list.toDS()
// action
val window = Window.partitionBy("id").orderBy("date")
val result = ds.withColumn("lag", lag($"cc", 1).over(window)).where(isnull($"lag") || $"lag" =!= $"cc").orderBy("id", "date")
result.show(false)
结果是(可以删除滞后列):
|id |date |cc |lag |
+---+--------+---+----+
|1 |20180101|2 |null|
|1 |20180102|3 |2 |
|1 |20180108|4 |3 |
|1 |20180109|3 |4 |
|2 |20180105|3 |null|
|2 |20180109|6 |3 |
+---+--------+---+----+