如何在 scala 的数据框中对多列进行 mapreduce?
How can I do mapreduce on multiple columns in dataframe in scala?
我的 spark 数据框如下所示:
+-------+------+-------+------+------+
|userid1|time |userid2|name1 |name2 |
+-------+------+-------+------+------+
|23 |1 |33 |user1 |user2 |
|23 |2 |33 |new |user2 |
|231 |1 |23 |231n |new |
|231 |4 |33 |231n |user2 |
+-------+------+-------+------+------+
每一行有2个userids,有相应的名字,但只有一次。
我想获取每个用户的最新名称。这就像组合 userid1
和 userid2
.
结果应该是:
+------+-----------+
|userid|latest name|
+------+-----------+
|23 |new |
|33 |user2 |
|231 |231n |
+------+-----------+
我该怎么做?
我正在考虑使用 partitonBy
,但我不知道如何合并列 userid1
和 userid2
的结果并获得最新名称。
我也在考虑使用rdd.flatMap((row => row._1 -> row._2),(row => row._3 -> row._2)).reduceByKey(_ max _))
但它是数据框,而不是 rdd,我不确定语法。 daatframe 中的 col 和 $ 真的让我很困惑。(对不起,我对 Spark 比较陌生。)
你能试试这个解决方案吗?
import spark.implicits._
val users = Seq(
(23, 1, 33, "user1", "user2"),
(23, 2, 33, "new", "user2"),
(231, 1, 23, "231", "new"),
(231, 4, 33, "231", "user2")
).toDF("userid1", "time", "userid2", "name1", "name2")
val users1 = users.select(col("userid1").as("userid"), col("name1").as("name"), col("time"))
val users2 = users.select(col("userid2").as("userid"), col("name2").as("name"), col("time"))
val unitedUsers = users1.union(users2)
val resultDf = unitedUsers
.withColumn("max_time", max("time").over(Window.partitionBy("userid")))
.where(col("max_time") === col("time"))
.select(col("userid"), col("name").as("latest_name"))
.distinct()
我的 spark 数据框如下所示:
+-------+------+-------+------+------+
|userid1|time |userid2|name1 |name2 |
+-------+------+-------+------+------+
|23 |1 |33 |user1 |user2 |
|23 |2 |33 |new |user2 |
|231 |1 |23 |231n |new |
|231 |4 |33 |231n |user2 |
+-------+------+-------+------+------+
每一行有2个userids,有相应的名字,但只有一次。
我想获取每个用户的最新名称。这就像组合 userid1
和 userid2
.
结果应该是:
+------+-----------+
|userid|latest name|
+------+-----------+
|23 |new |
|33 |user2 |
|231 |231n |
+------+-----------+
我该怎么做?
我正在考虑使用 partitonBy
,但我不知道如何合并列 userid1
和 userid2
的结果并获得最新名称。
我也在考虑使用rdd.flatMap((row => row._1 -> row._2),(row => row._3 -> row._2)).reduceByKey(_ max _))
但它是数据框,而不是 rdd,我不确定语法。 daatframe 中的 col 和 $ 真的让我很困惑。(对不起,我对 Spark 比较陌生。)
你能试试这个解决方案吗?
import spark.implicits._
val users = Seq(
(23, 1, 33, "user1", "user2"),
(23, 2, 33, "new", "user2"),
(231, 1, 23, "231", "new"),
(231, 4, 33, "231", "user2")
).toDF("userid1", "time", "userid2", "name1", "name2")
val users1 = users.select(col("userid1").as("userid"), col("name1").as("name"), col("time"))
val users2 = users.select(col("userid2").as("userid"), col("name2").as("name"), col("time"))
val unitedUsers = users1.union(users2)
val resultDf = unitedUsers
.withColumn("max_time", max("time").over(Window.partitionBy("userid")))
.where(col("max_time") === col("time"))
.select(col("userid"), col("name").as("latest_name"))
.distinct()