通过键 groupByKey 或 aggregateByKey 分区后维护顺序
Maintain order after partition by key groupByKey or aggregateByKey
我有这样的数据
Machine , date , hours
123,2014-06-15,15.4
123,2014-06-16,20.3
123,2014-06-18,11.4
131,2014-06-15,12.2
131,2014-06-16,11.5
131,2014-06-17,18.2
131,2014-06-18,19.2
134,2014-06-15,11.1
134,2014-06-16,16.2
我想按键 Machine
进行分区,并根据 1 个默认值 0
查找小时滞后
Machine , date , hours lag
123,2014-06-15,15.4,0
123,2014-06-16,20.3,15.4
123,2014-06-18,11.4,20.3
131,2014-06-15,12.2,0
131,2014-06-16,11.5,12.2
131,2014-06-17,18.2,11.5
131,2014-06-18,19.2,18.2
134,2014-06-15,11.1,0
134,2014-06-16,16.2,11.1
我正在使用 PairedRDD
和 groupByKey
方法,但它没有按预期顺序产生。
因为这里真的没有给定顺序。除了一些例外,如果你使用的任何转换需要混洗,RDDs 应该被认为是无序的。
如果您需要特定顺序,您必须手动对数据进行排序:
case class Record(machine: Long, date: java.sql.Date, hours: Double)
case class RecordWithLag(
machine: Long, date: java.sql.Date, hours: Double, lag: Double
)
def getLag(xs: Seq[Record]): Seq[RecordWithLag] = ???
val rdd = sc.parallelize(List(
Record(123, java.sql.Date.valueOf("2014-06-15"), 15.4),
Record(123, java.sql.Date.valueOf("2014-06-16"), 20.3),
Record(123, java.sql.Date.valueOf("2014-06-18"), 11.4),
Record(131, java.sql.Date.valueOf("2014-06-15"), 12.2),
Record(131, java.sql.Date.valueOf("2014-06-16"), 11.5),
Record(131, java.sql.Date.valueOf("2014-06-17"), 18.2),
Record(131, java.sql.Date.valueOf("2014-06-18"), 19.2),
Record(134, java.sql.Date.valueOf("2014-06-15"), 11.1),
Record(134, java.sql.Date.valueOf("2014-06-16"), 16.2)
))
rdd
.groupBy(_.machine)
.mapValues(_.toSeq.sortWith((x, y) => x.date.compareTo(y.date) < 0))
.mapValues(getLag)
为了提高性能,您应该考虑将您的 Spark 发行版更新到 >= 1.4.0 并使用具有 window 函数的数据框:
val df = sqlContext.createDataFrame(rdd)
df.registerTempTable("df")
sqlContext.sql(
""""SELECT *, lag(hours, 1, 0) OVER (
PARTITION BY machine ORDER BY date
) lag FROM df"""
)
+-------+----------+-----+----+
|machine| date|hours| lag|
+-------+----------+-----+----+
| 123|2014-06-15| 15.4| 0.0|
| 123|2014-06-16| 20.3|15.4|
| 123|2014-06-18| 11.4|20.3|
| 131|2014-06-15| 12.2| 0.0|
| 131|2014-06-16| 11.5|12.2|
| 131|2014-06-17| 18.2|11.5|
| 131|2014-06-18| 19.2|18.2|
| 134|2014-06-15| 11.1| 0.0|
| 134|2014-06-16| 16.2|11.1|
+-------+----------+-----+----+
或
df.select(
$"*",
lag($"hours", 1, 0).over(
Window.partitionBy($"machine").orderBy($"date")
).alias("lag")
)
我有这样的数据
Machine , date , hours
123,2014-06-15,15.4
123,2014-06-16,20.3
123,2014-06-18,11.4
131,2014-06-15,12.2
131,2014-06-16,11.5
131,2014-06-17,18.2
131,2014-06-18,19.2
134,2014-06-15,11.1
134,2014-06-16,16.2
我想按键 Machine
进行分区,并根据 1 个默认值 0
Machine , date , hours lag
123,2014-06-15,15.4,0
123,2014-06-16,20.3,15.4
123,2014-06-18,11.4,20.3
131,2014-06-15,12.2,0
131,2014-06-16,11.5,12.2
131,2014-06-17,18.2,11.5
131,2014-06-18,19.2,18.2
134,2014-06-15,11.1,0
134,2014-06-16,16.2,11.1
我正在使用 PairedRDD
和 groupByKey
方法,但它没有按预期顺序产生。
因为这里真的没有给定顺序。除了一些例外,如果你使用的任何转换需要混洗,RDDs 应该被认为是无序的。
如果您需要特定顺序,您必须手动对数据进行排序:
case class Record(machine: Long, date: java.sql.Date, hours: Double)
case class RecordWithLag(
machine: Long, date: java.sql.Date, hours: Double, lag: Double
)
def getLag(xs: Seq[Record]): Seq[RecordWithLag] = ???
val rdd = sc.parallelize(List(
Record(123, java.sql.Date.valueOf("2014-06-15"), 15.4),
Record(123, java.sql.Date.valueOf("2014-06-16"), 20.3),
Record(123, java.sql.Date.valueOf("2014-06-18"), 11.4),
Record(131, java.sql.Date.valueOf("2014-06-15"), 12.2),
Record(131, java.sql.Date.valueOf("2014-06-16"), 11.5),
Record(131, java.sql.Date.valueOf("2014-06-17"), 18.2),
Record(131, java.sql.Date.valueOf("2014-06-18"), 19.2),
Record(134, java.sql.Date.valueOf("2014-06-15"), 11.1),
Record(134, java.sql.Date.valueOf("2014-06-16"), 16.2)
))
rdd
.groupBy(_.machine)
.mapValues(_.toSeq.sortWith((x, y) => x.date.compareTo(y.date) < 0))
.mapValues(getLag)
为了提高性能,您应该考虑将您的 Spark 发行版更新到 >= 1.4.0 并使用具有 window 函数的数据框:
val df = sqlContext.createDataFrame(rdd)
df.registerTempTable("df")
sqlContext.sql(
""""SELECT *, lag(hours, 1, 0) OVER (
PARTITION BY machine ORDER BY date
) lag FROM df"""
)
+-------+----------+-----+----+
|machine| date|hours| lag|
+-------+----------+-----+----+
| 123|2014-06-15| 15.4| 0.0|
| 123|2014-06-16| 20.3|15.4|
| 123|2014-06-18| 11.4|20.3|
| 131|2014-06-15| 12.2| 0.0|
| 131|2014-06-16| 11.5|12.2|
| 131|2014-06-17| 18.2|11.5|
| 131|2014-06-18| 19.2|18.2|
| 134|2014-06-15| 11.1| 0.0|
| 134|2014-06-16| 16.2|11.1|
+-------+----------+-----+----+
或
df.select(
$"*",
lag($"hours", 1, 0).over(
Window.partitionBy($"machine").orderBy($"date")
).alias("lag")
)