Spark2 Dataframe/RDD 分组处理
Spark2 Dataframe/RDD process in groups
我将以下 table 存储在名为 ExampleData 的 Hive 中:
+--------+-----+---|
|Site_ID |Time |Age|
+--------+-----+---|
|1 |10:00| 20|
|1 |11:00| 21|
|2 |10:00| 24|
|2 |11:00| 24|
|2 |12:00| 20|
|3 |11:00| 24|
+--------+-----+---+
我需要能够按站点处理数据。不幸的是,按站点对其进行分区不起作用(有超过 10 万个站点,所有站点的数据量都非常小)。
对于每个站点,我需要分别 select Time 列和 Age 列,并使用它来输入一个函数(理想情况下我想 运行 在执行程序上,而不是driver)
我有一个我认为我希望它如何工作的存根,但是这个解决方案只会在 driver 上 运行,所以它非常慢。我需要找到一种编写它的方法,这样它就会 运行 一个执行者级别:
// fetch a list of distinct sites and return them to the driver
//(if you don't, you won't be able to loop around them as they're not on the executors)
val distinctSites = spark.sql("SELECT site_id FROM ExampleData GROUP BY site_id LIMIT 10")
.collect
val allSiteData = spark.sql("SELECT site_id, time, age FROM ExampleData")
distinctSites.foreach(row => {
allSiteData.filter("site_id = " + row.get(0))
val times = allSiteData.select("time").collect()
val ages = allSiteData.select("ages").collect()
processTimesAndAges(times, ages)
})
def processTimesAndAges(times: Array[Row], ages: Array[Row]) {
// do some processing
}
我试过在所有节点上广播 distinctSites,但这并没有取得成果。
这似乎是一个简单的概念,但我花了几天时间研究它。我是 Scala/Spark 的新手,如果这是一个荒谬的问题,我深表歉意!
非常感谢任何建议或提示。
RDD API 提供了许多函数,可用于在组中执行操作,从低级 repartition / repartitionAndSortWithinPartitions 开始,到许多 *byKey 方法(combineByKey、groupByKey、reduceByKey 等)结束。 ).
示例:
rdd.map( tup => ((tup._1, tup._2, tup._3), tup) ).
groupByKey().
forEachPartition( iter => doSomeJob(iter) )
在DataFrame中你可以使用聚合函数,GroupedData class 为最常用的函数提供了多种方法,包括count、max、min、mean和sum
示例:
val df = sc.parallelize(Seq(
(1, 10.3, 10), (1, 11.5, 10),
(2, 12.6, 20), (3, 2.6, 30))
).toDF("Site_ID ", "Time ", "Age")
df.show()
+--------+-----+---+
|Site_ID |Time |Age|
+--------+-----+---+
| 1| 10.3| 10|
| 1| 11.5| 10|
| 2| 12.6| 20|
| 3| 2.6| 30|
+--------+-----+---+
df.groupBy($"Site_ID ").count.show
+--------+-----+
|Site_ID |count|
+--------+-----+
| 1| 2|
| 3| 1|
| 2| 1|
+--------+-----+
注意:正如您提到的解决方案非常慢,您需要使用分区,在您的情况下范围分区是一个不错的选择。
我将以下 table 存储在名为 ExampleData 的 Hive 中:
+--------+-----+---|
|Site_ID |Time |Age|
+--------+-----+---|
|1 |10:00| 20|
|1 |11:00| 21|
|2 |10:00| 24|
|2 |11:00| 24|
|2 |12:00| 20|
|3 |11:00| 24|
+--------+-----+---+
我需要能够按站点处理数据。不幸的是,按站点对其进行分区不起作用(有超过 10 万个站点,所有站点的数据量都非常小)。
对于每个站点,我需要分别 select Time 列和 Age 列,并使用它来输入一个函数(理想情况下我想 运行 在执行程序上,而不是driver)
我有一个我认为我希望它如何工作的存根,但是这个解决方案只会在 driver 上 运行,所以它非常慢。我需要找到一种编写它的方法,这样它就会 运行 一个执行者级别:
// fetch a list of distinct sites and return them to the driver
//(if you don't, you won't be able to loop around them as they're not on the executors)
val distinctSites = spark.sql("SELECT site_id FROM ExampleData GROUP BY site_id LIMIT 10")
.collect
val allSiteData = spark.sql("SELECT site_id, time, age FROM ExampleData")
distinctSites.foreach(row => {
allSiteData.filter("site_id = " + row.get(0))
val times = allSiteData.select("time").collect()
val ages = allSiteData.select("ages").collect()
processTimesAndAges(times, ages)
})
def processTimesAndAges(times: Array[Row], ages: Array[Row]) {
// do some processing
}
我试过在所有节点上广播 distinctSites,但这并没有取得成果。
这似乎是一个简单的概念,但我花了几天时间研究它。我是 Scala/Spark 的新手,如果这是一个荒谬的问题,我深表歉意!
非常感谢任何建议或提示。
RDD API 提供了许多函数,可用于在组中执行操作,从低级 repartition / repartitionAndSortWithinPartitions 开始,到许多 *byKey 方法(combineByKey、groupByKey、reduceByKey 等)结束。 ).
示例:
rdd.map( tup => ((tup._1, tup._2, tup._3), tup) ).
groupByKey().
forEachPartition( iter => doSomeJob(iter) )
在DataFrame中你可以使用聚合函数,GroupedData class 为最常用的函数提供了多种方法,包括count、max、min、mean和sum
示例:
val df = sc.parallelize(Seq(
(1, 10.3, 10), (1, 11.5, 10),
(2, 12.6, 20), (3, 2.6, 30))
).toDF("Site_ID ", "Time ", "Age")
df.show()
+--------+-----+---+
|Site_ID |Time |Age|
+--------+-----+---+
| 1| 10.3| 10|
| 1| 11.5| 10|
| 2| 12.6| 20|
| 3| 2.6| 30|
+--------+-----+---+
df.groupBy($"Site_ID ").count.show
+--------+-----+
|Site_ID |count|
+--------+-----+
| 1| 2|
| 3| 1|
| 2| 1|
+--------+-----+
注意:正如您提到的解决方案非常慢,您需要使用分区,在您的情况下范围分区是一个不错的选择。