Spark DataFrame:在 orderBy 之后的 groupBy 是否维持该顺序?
Spark DataFrame: does groupBy after orderBy maintain that order?
我有一个 Spark 2.0 数据框 example
,其结构如下:
id, hour, count
id1, 0, 12
id1, 1, 55
..
id1, 23, 44
id2, 0, 12
id2, 1, 89
..
id2, 23, 34
etc.
它包含每个 id 的 24 个条目(一天中的每个小时一个),并使用 orderBy 函数按 id、小时排序。
我创建了一个聚合器 groupConcat
:
def groupConcat(separator: String, columnToConcat: Int) = new Aggregator[Row, String, String] with Serializable {
override def zero: String = ""
override def reduce(b: String, a: Row) = b + separator + a.get(columnToConcat)
override def merge(b1: String, b2: String) = b1 + b2
override def finish(b: String) = b.substring(1)
override def bufferEncoder: Encoder[String] = Encoders.STRING
override def outputEncoder: Encoder[String] = Encoders.STRING
}.toColumn
它帮助我将列连接成字符串以获得最终的数据框:
id, hourly_count
id1, 12:55:..:44
id2, 12:89:..:34
etc.
我的问题是,如果我这样做 example.orderBy($"id",$"hour").groupBy("id").agg(groupConcat(":",2) as "hourly_count")
,是否可以保证每小时计数在各自的存储桶中正确排序?
我读到 RDD 不一定是这种情况(参见 ),但 DataFrames 可能有所不同?
如果没有,我该如何解决?
简短的回答是肯定的,每小时计数将保持相同的顺序。
总而言之,在分组之前先排序很重要。此外,排序必须与您实际要排序的组 + 列相同。
例如:
employees
.sort("company_id", "department_id", "employee_role")
.groupBy("company_id", "department_id")
.agg(Aggregators.groupConcat(":", 2) as "count_per_role")
我有一个订单并不总是保持不变的情况:有时是,大部分不是。
我的数据框在 Spark 1.6
上有 200 个分区 运行
df_group_sort = data.orderBy(times).groupBy(group_key).agg(
F.sort_array(F.collect_list(times)),
F.collect_list(times)
)
为了检查顺序,我比较了
的 return 值
F.sort_array(F.collect_list(times))
和
F.collect_list(times)
给予例如(左:sort_array(collect_list());右:collect_list())
2016-12-19 08:20:27.172000 2016-12-19 09:57:03.764000
2016-12-19 08:20:30.163000 2016-12-19 09:57:06.763000
2016-12-19 08:20:33.158000 2016-12-19 09:57:09.763000
2016-12-19 08:20:36.158000 2016-12-19 09:57:12.763000
2016-12-19 08:22:27.090000 2016-12-19 09:57:18.762000
2016-12-19 08:22:30.089000 2016-12-19 09:57:33.766000
2016-12-19 08:22:57.088000 2016-12-19 09:57:39.811000
2016-12-19 08:23:03.085000 2016-12-19 09:57:45.770000
2016-12-19 08:23:06.086000 2016-12-19 09:57:57.809000
2016-12-19 08:23:12.085000 2016-12-19 09:59:56.333000
2016-12-19 08:23:15.086000 2016-12-19 10:00:11.329000
2016-12-19 08:23:18.087000 2016-12-19 10:00:14.331000
2016-12-19 08:23:21.085000 2016-12-19 10:00:17.329000
2016-12-19 08:23:24.085000 2016-12-19 10:00:20.326000
左列始终排序,而右列仅由排序的块组成。
对于不同的 take() 执行,右列中块的顺序是不同的。
顺序可能相同也可能不同,具体取决于分区数和数据分布。我们可以使用 rdd 本身来解决。
例如::
我将以下示例数据保存在一个文件中,并将其加载到 hdfs 中。
1,type1,300
2,type1,100
3,type2,400
4,type2,500
5,type1,400
6,type3,560
7,type2,200
8,type3,800
并执行以下命令:
sc.textFile("/spark_test/test.txt").map(x=>x.split(",")).filter(x=>x.length==3).groupBy(_(1)).mapValues(x=>x.toList.sortBy(_(2)).map(_(0)).mkString("~")).collect()
输出:
Array[(String, String)] = Array((type3,6~8), (type1,2~1~5), (type2,7~3~4))
也就是说,我们按类型对数据进行分组,然后按价格排序,然后用“~”作为分隔符将 id 连接起来。
上面的命令可以分解如下:
val validData=sc.textFile("/spark_test/test.txt").map(x=>x.split(",")).filter(x=>x.length==3)
val groupedData=validData.groupBy(_(1)) //group data rdds
val sortedJoinedData=groupedData.mapValues(x=>{
val list=x.toList
val sortedList=list.sortBy(_(2))
val idOnlyList=sortedList.map(_(0))
idOnlyList.mkString("~")
}
)
sortedJoinedData.collect()
然后我们可以使用命令
选择一个特定的组
sortedJoinedData.filter(_._1=="type1").collect()
输出:
Array[(String, String)] = Array((type1,2~1~5))
正如其他人指出的那样,groupBy
在 orderBy
之后无法维持秩序。您想要做的是使用 Window 函数,按 id 分区并按小时排序。您可以 collect_list
对此进行处理,然后获取结果列表中的最大值(最大),因为它们是累积的(即第一个小时只会在列表中包含自己,第二个小时将在列表中包含 2 个元素,并且依此类推)。
完整示例代码:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import spark.implicits._
val data = Seq(( "id1", 0, 12),
("id1", 1, 55),
("id1", 23, 44),
("id2", 0, 12),
("id2", 1, 89),
("id2", 23, 34)).toDF("id", "hour", "count")
val mergeList = udf{(strings: Seq[String]) => strings.mkString(":")}
data.withColumn("collected", collect_list($"count")
.over(Window.partitionBy("id")
.orderBy("hour")))
.groupBy("id")
.agg(max($"collected").as("collected"))
.withColumn("hourly_count", mergeList($"collected"))
.select("id", "hourly_count").show
这使我们始终处于 DataFrame 世界中。我还简化了 OP 使用的 UDF 代码。
输出:
+---+------------+
| id|hourly_count|
+---+------------+
|id1| 12:55:44|
|id2| 12:89:34|
+---+------------+
如果您想解决 Java 中的实现(Scala 和 Python 应该类似):
example.orderBy("hour")
.groupBy("id")
.agg(functions.sort_array(
functions.collect_list(
functions.struct(dataRow.col("hour"),
dataRow.col("count"))),false)
.as("hourly_count"));
不,groupByKey
内的排序不一定会保留,但众所周知,这很难在一个节点的内存中重现。如前所述,发生这种情况的最典型方式是需要重新分区以使 groupByKey
发生。我设法通过在 sort
之后手动执行 repartition
来重现这一点。然后我将结果传递给 groupByKey
.
case class Numbered(num:Int, group:Int, otherData:Int)
// configure spark with "spark.sql.shuffle.partitions" = 2 or some other small number
val v =
(1 to 100000)
// Make waaay more groups then partitions. I added an extra integer just to mess with the sort hash computation (i.e. so it won't be monotonic, not sure if needed)
.map(Numbered(_, Random.nextInt(300), Random.nextInt(1000000))).toDS()
// Be sure they are stored in a small number of partitions
.repartition(2)
.sort($"num")
// Repartition again with a waaay bigger number then there are groups so that when things need to be merged you can get them out of order.
.repartition(200)
.groupByKey(_.group)
.mapGroups {
case (g, nums) =>
nums // all you need is .sortBy(_.num) here to fix the problem
.map(_.num)
.mkString("~")
}
.collect()
// Walk through the concatenated strings. If any number ahead
// is smaller than the number before it, you know that something
// is out of order.
v.zipWithIndex.map { case (r, i) =>
r.split("~").map(_.toInt).foldLeft(0) { case (prev, next) =>
if (next < prev) {
println(s"*** Next: ${next} less then ${prev} for dataset ${i + 1} ***")
}
next
}
}
我有一个 Spark 2.0 数据框 example
,其结构如下:
id, hour, count
id1, 0, 12
id1, 1, 55
..
id1, 23, 44
id2, 0, 12
id2, 1, 89
..
id2, 23, 34
etc.
它包含每个 id 的 24 个条目(一天中的每个小时一个),并使用 orderBy 函数按 id、小时排序。
我创建了一个聚合器 groupConcat
:
def groupConcat(separator: String, columnToConcat: Int) = new Aggregator[Row, String, String] with Serializable {
override def zero: String = ""
override def reduce(b: String, a: Row) = b + separator + a.get(columnToConcat)
override def merge(b1: String, b2: String) = b1 + b2
override def finish(b: String) = b.substring(1)
override def bufferEncoder: Encoder[String] = Encoders.STRING
override def outputEncoder: Encoder[String] = Encoders.STRING
}.toColumn
它帮助我将列连接成字符串以获得最终的数据框:
id, hourly_count
id1, 12:55:..:44
id2, 12:89:..:34
etc.
我的问题是,如果我这样做 example.orderBy($"id",$"hour").groupBy("id").agg(groupConcat(":",2) as "hourly_count")
,是否可以保证每小时计数在各自的存储桶中正确排序?
我读到 RDD 不一定是这种情况(参见
如果没有,我该如何解决?
简短的回答是肯定的,每小时计数将保持相同的顺序。
总而言之,在分组之前先排序很重要。此外,排序必须与您实际要排序的组 + 列相同。
例如:
employees
.sort("company_id", "department_id", "employee_role")
.groupBy("company_id", "department_id")
.agg(Aggregators.groupConcat(":", 2) as "count_per_role")
我有一个订单并不总是保持不变的情况:有时是,大部分不是。
我的数据框在 Spark 1.6
上有 200 个分区 运行df_group_sort = data.orderBy(times).groupBy(group_key).agg(
F.sort_array(F.collect_list(times)),
F.collect_list(times)
)
为了检查顺序,我比较了
的 return 值F.sort_array(F.collect_list(times))
和
F.collect_list(times)
给予例如(左:sort_array(collect_list());右:collect_list())
2016-12-19 08:20:27.172000 2016-12-19 09:57:03.764000
2016-12-19 08:20:30.163000 2016-12-19 09:57:06.763000
2016-12-19 08:20:33.158000 2016-12-19 09:57:09.763000
2016-12-19 08:20:36.158000 2016-12-19 09:57:12.763000
2016-12-19 08:22:27.090000 2016-12-19 09:57:18.762000
2016-12-19 08:22:30.089000 2016-12-19 09:57:33.766000
2016-12-19 08:22:57.088000 2016-12-19 09:57:39.811000
2016-12-19 08:23:03.085000 2016-12-19 09:57:45.770000
2016-12-19 08:23:06.086000 2016-12-19 09:57:57.809000
2016-12-19 08:23:12.085000 2016-12-19 09:59:56.333000
2016-12-19 08:23:15.086000 2016-12-19 10:00:11.329000
2016-12-19 08:23:18.087000 2016-12-19 10:00:14.331000
2016-12-19 08:23:21.085000 2016-12-19 10:00:17.329000
2016-12-19 08:23:24.085000 2016-12-19 10:00:20.326000
左列始终排序,而右列仅由排序的块组成。 对于不同的 take() 执行,右列中块的顺序是不同的。
顺序可能相同也可能不同,具体取决于分区数和数据分布。我们可以使用 rdd 本身来解决。
例如::
我将以下示例数据保存在一个文件中,并将其加载到 hdfs 中。
1,type1,300
2,type1,100
3,type2,400
4,type2,500
5,type1,400
6,type3,560
7,type2,200
8,type3,800
并执行以下命令:
sc.textFile("/spark_test/test.txt").map(x=>x.split(",")).filter(x=>x.length==3).groupBy(_(1)).mapValues(x=>x.toList.sortBy(_(2)).map(_(0)).mkString("~")).collect()
输出:
Array[(String, String)] = Array((type3,6~8), (type1,2~1~5), (type2,7~3~4))
也就是说,我们按类型对数据进行分组,然后按价格排序,然后用“~”作为分隔符将 id 连接起来。 上面的命令可以分解如下:
val validData=sc.textFile("/spark_test/test.txt").map(x=>x.split(",")).filter(x=>x.length==3)
val groupedData=validData.groupBy(_(1)) //group data rdds
val sortedJoinedData=groupedData.mapValues(x=>{
val list=x.toList
val sortedList=list.sortBy(_(2))
val idOnlyList=sortedList.map(_(0))
idOnlyList.mkString("~")
}
)
sortedJoinedData.collect()
然后我们可以使用命令
选择一个特定的组sortedJoinedData.filter(_._1=="type1").collect()
输出:
Array[(String, String)] = Array((type1,2~1~5))
groupBy
在 orderBy
之后无法维持秩序。您想要做的是使用 Window 函数,按 id 分区并按小时排序。您可以 collect_list
对此进行处理,然后获取结果列表中的最大值(最大),因为它们是累积的(即第一个小时只会在列表中包含自己,第二个小时将在列表中包含 2 个元素,并且依此类推)。
完整示例代码:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import spark.implicits._
val data = Seq(( "id1", 0, 12),
("id1", 1, 55),
("id1", 23, 44),
("id2", 0, 12),
("id2", 1, 89),
("id2", 23, 34)).toDF("id", "hour", "count")
val mergeList = udf{(strings: Seq[String]) => strings.mkString(":")}
data.withColumn("collected", collect_list($"count")
.over(Window.partitionBy("id")
.orderBy("hour")))
.groupBy("id")
.agg(max($"collected").as("collected"))
.withColumn("hourly_count", mergeList($"collected"))
.select("id", "hourly_count").show
这使我们始终处于 DataFrame 世界中。我还简化了 OP 使用的 UDF 代码。
输出:
+---+------------+
| id|hourly_count|
+---+------------+
|id1| 12:55:44|
|id2| 12:89:34|
+---+------------+
如果您想解决 Java 中的实现(Scala 和 Python 应该类似):
example.orderBy("hour")
.groupBy("id")
.agg(functions.sort_array(
functions.collect_list(
functions.struct(dataRow.col("hour"),
dataRow.col("count"))),false)
.as("hourly_count"));
不,groupByKey
内的排序不一定会保留,但众所周知,这很难在一个节点的内存中重现。如前所述,发生这种情况的最典型方式是需要重新分区以使 groupByKey
发生。我设法通过在 sort
之后手动执行 repartition
来重现这一点。然后我将结果传递给 groupByKey
.
case class Numbered(num:Int, group:Int, otherData:Int)
// configure spark with "spark.sql.shuffle.partitions" = 2 or some other small number
val v =
(1 to 100000)
// Make waaay more groups then partitions. I added an extra integer just to mess with the sort hash computation (i.e. so it won't be monotonic, not sure if needed)
.map(Numbered(_, Random.nextInt(300), Random.nextInt(1000000))).toDS()
// Be sure they are stored in a small number of partitions
.repartition(2)
.sort($"num")
// Repartition again with a waaay bigger number then there are groups so that when things need to be merged you can get them out of order.
.repartition(200)
.groupByKey(_.group)
.mapGroups {
case (g, nums) =>
nums // all you need is .sortBy(_.num) here to fix the problem
.map(_.num)
.mkString("~")
}
.collect()
// Walk through the concatenated strings. If any number ahead
// is smaller than the number before it, you know that something
// is out of order.
v.zipWithIndex.map { case (r, i) =>
r.split("~").map(_.toInt).foldLeft(0) { case (prev, next) =>
if (next < prev) {
println(s"*** Next: ${next} less then ${prev} for dataset ${i + 1} ***")
}
next
}
}