Apache Spark RDD:如何根据成对的 RDD 键和值获取最新数据
Apache Spark RDD : How to get latest data based on Paired RDD key and value
我正在从 HDFS 读取数据。每个用户我有多行,我必须 select 每个用户的最新行。
行示例 (RDD [Id: Int, DateTime: String, Name: STRING]
)
1,2016-05-01 01:01:01,testa
2,2016-05-02 01:01:01,testb
1,2016-05-05 01:01:01,testa
在上面的例子中有两行 Id=1,但是我只想要每个 id 一次(只有最新的一个,它是相应的数据)我想要如下所示的输出 RDD。
2,2016-05-02 01:01:01,testb
1,2016-05-05 01:01:01,testa
我的想法
我可以将这些数据收集到一个数组中,然后 运行 循环以获得所需的结果,方法是为每个用户保留最新的数据。
我读取收集数据给主节点。我的数据是 30 GB,Master 上的 RAM 是 25 GB。所以我不想尝试这个。
你们能分享完成这个任务的想法和代码吗?
将您的日期字符串转换为时间戳并通过选择具有最新时间戳的元组在 id 上聚合。
import java.time.format.DateTimeFormatter
import java.time.LocalDateTime
val yourRdd: RDD[Int, String, String] = sc.parallelize(List(
1, "2016-05-01 01:01:01", "testa"
2, "2016-05-02 01:01:01", "testb"
1, "2016-05-05 01:01:01", "testa"
))
val dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH-mm-ss");
val zeroVal = ("", Long.MinValue, "", "")
val rddWithTimestamp = yourRdd
.map({
case (id, datetimeStr, name) => {
val timestamp: Long = LocalDateTime.parse(datetimeStr, dateFormetter)
.toInstant().toEpochMilli()
(id, (id, timestamp, datetimeStr, name))
}
})
val yourRequiredRdd = rddWithTimestamp
.aggregateByKey(zeroValue)(
(t1, t2) => if (t1._2 > t2._2) t1 else t2
(t1, t2) => if (t1._2 > t2._2) t1 else t2
)
你可以使用 DataFrame
API:
import org.apache.spark.sql.functions._
val df = sc.parallelize(Seq(
(1, "2016-05-01 01:01:01", "testA"),
(2, "2016-05-02 01:01:01", "testB"),
(1, "2016-05-05 01:01:01", "testA")))
.toDF("id", "dateTime", "name")
df.withColumn("dateTime", unix_timestamp($"dateTime"))
.groupBy("id", "name")
.max("dateTime")
.withColumnRenamed("max(dateTime)", "dateTime")
.withColumn("dateTime", from_unixtime($"dateTime"))
.show()
这需要 HiveContext
作为您的 SQLContext
:
import org.apache.spark.sql.hive.HiveContext
val sqlContext = new HiveContext(sc)
import sqlContext.implicits._
这可能会帮助有需要的人。
val yourRdd = sc.parallelize(List(
(30, ("1122112211111".toLong, "testa", "testa", "testa")),
(1, ("1122112211111".toLong, "testa", "testa", "testa")),
(1, ("1122112211119".toLong, "testa", "testa", "testa")),
(1, ("1122112211112".toLong, "testa", "testa", "testa")),
(2, ("1122112211111".toLong, "testa", "testa", "testa")),
(2, ("1122112211110".toLong, "testa", "testa", "testa"))
))
val addToSet1 = (
s: (Int, (Long, String, String, String)),
v: ((Long, String, String, String))
) => if (s._2._1 > v._1 ) s else (s._1,v)
val mergePartitionSets1 = (
s: (Int, (Long, String, String, String)),
v: (Int, (Long, String, String, String))
) => if (s._2._1 > v._2._1 ) s else v
val ab1 = yourRdd
.aggregateByKey(initialSet)(addToSet1, mergePartitionSets1)
ab1.take(10).foreach(println)
我正在从 HDFS 读取数据。每个用户我有多行,我必须 select 每个用户的最新行。
行示例 (RDD [Id: Int, DateTime: String, Name: STRING]
)
1,2016-05-01 01:01:01,testa
2,2016-05-02 01:01:01,testb
1,2016-05-05 01:01:01,testa
在上面的例子中有两行 Id=1,但是我只想要每个 id 一次(只有最新的一个,它是相应的数据)我想要如下所示的输出 RDD。
2,2016-05-02 01:01:01,testb
1,2016-05-05 01:01:01,testa
我的想法
我可以将这些数据收集到一个数组中,然后 运行 循环以获得所需的结果,方法是为每个用户保留最新的数据。
我读取收集数据给主节点。我的数据是 30 GB,Master 上的 RAM 是 25 GB。所以我不想尝试这个。
你们能分享完成这个任务的想法和代码吗?
将您的日期字符串转换为时间戳并通过选择具有最新时间戳的元组在 id 上聚合。
import java.time.format.DateTimeFormatter
import java.time.LocalDateTime
val yourRdd: RDD[Int, String, String] = sc.parallelize(List(
1, "2016-05-01 01:01:01", "testa"
2, "2016-05-02 01:01:01", "testb"
1, "2016-05-05 01:01:01", "testa"
))
val dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH-mm-ss");
val zeroVal = ("", Long.MinValue, "", "")
val rddWithTimestamp = yourRdd
.map({
case (id, datetimeStr, name) => {
val timestamp: Long = LocalDateTime.parse(datetimeStr, dateFormetter)
.toInstant().toEpochMilli()
(id, (id, timestamp, datetimeStr, name))
}
})
val yourRequiredRdd = rddWithTimestamp
.aggregateByKey(zeroValue)(
(t1, t2) => if (t1._2 > t2._2) t1 else t2
(t1, t2) => if (t1._2 > t2._2) t1 else t2
)
你可以使用 DataFrame
API:
import org.apache.spark.sql.functions._
val df = sc.parallelize(Seq(
(1, "2016-05-01 01:01:01", "testA"),
(2, "2016-05-02 01:01:01", "testB"),
(1, "2016-05-05 01:01:01", "testA")))
.toDF("id", "dateTime", "name")
df.withColumn("dateTime", unix_timestamp($"dateTime"))
.groupBy("id", "name")
.max("dateTime")
.withColumnRenamed("max(dateTime)", "dateTime")
.withColumn("dateTime", from_unixtime($"dateTime"))
.show()
这需要 HiveContext
作为您的 SQLContext
:
import org.apache.spark.sql.hive.HiveContext
val sqlContext = new HiveContext(sc)
import sqlContext.implicits._
这可能会帮助有需要的人。
val yourRdd = sc.parallelize(List(
(30, ("1122112211111".toLong, "testa", "testa", "testa")),
(1, ("1122112211111".toLong, "testa", "testa", "testa")),
(1, ("1122112211119".toLong, "testa", "testa", "testa")),
(1, ("1122112211112".toLong, "testa", "testa", "testa")),
(2, ("1122112211111".toLong, "testa", "testa", "testa")),
(2, ("1122112211110".toLong, "testa", "testa", "testa"))
))
val addToSet1 = (
s: (Int, (Long, String, String, String)),
v: ((Long, String, String, String))
) => if (s._2._1 > v._1 ) s else (s._1,v)
val mergePartitionSets1 = (
s: (Int, (Long, String, String, String)),
v: (Int, (Long, String, String, String))
) => if (s._2._1 > v._2._1 ) s else v
val ab1 = yourRdd
.aggregateByKey(initialSet)(addToSet1, mergePartitionSets1)
ab1.take(10).foreach(println)