如何使用scala获取交易中的第一条记录?
How to fetch first record in transaction using scala?
我的数据看起来像:
我想获取每个出现时间最短的 dtcode 的第一条记录。
期望的输出:
我想通过使用 scala 获取。
请指导我构建逻辑。
谢谢,
赛亚姆.
--- 使用 Shaido 的正确评论进行编辑 ---
通常,GROUP BY
会处理这个,如果 table 中的顺序无关紧要,
SQL 适用于对大量相关数据进行分组。但是,您的分析取决于数据输入的顺序,并且更改是由一列中的更改触发的,该更改可以在以后重复并且不能聚合,而其他列可以继续更改。
在这种情况下,您需要循环遍历数据并手动检测更改,因为 SQL 没有简单的方法来对此类事物进行分组。我回答的有点太快了,没注意到。
这最好通过存储过程或显示语言来处理。如果你愿意,我可以给你 PHP 中的代码。
另一个作弊是添加 GROUP BY 使用的列(我们称之为 groubycheat
),每次 dtcode
更改时都会增加
SELECT MIN(Currentdatedtime) as Currentdatedtime, dtcode
FROM <tablename>
GROUP BY groupbycheat;
这仍然需要一个循环来添加字段,但是如果您需要多次获取结果,那么这是值得的。否则..否
我对您的问题进行了更多思考,并提出了一个更好的解决方案,即使用数据帧的 Window
功能。首先,所有内容都按 Currentdatedtime
排序,然后检查每一行以查看 dtcode
是否已更改。使用您的示例数据:
val spark = SparkSession.builder.getOrCreate()
import spark.implicits._
val df = Seq(("7-1-2016 0:00:17",0),("7-1-2016 0:01:17",0),
("7-1-2016 0:02:17",4),("7-1-2016 0:03:17",4),
("7-1-2016 0:04:17",0),("7-1-2016 0:05:17",0),
("7-1-2016 0:06:17",0),("7-1-2016 0:07:17",5)).toDF("Currentdatedtime", "dtcode")
val w = Window.orderBy("Currentdatedtime")
val df2 = df.withColumn("dtcode_change",
when(lag($"dtcode", 1).over(w) === $"dtcode", 0).
otherwise(1))
.filter($"dtcode_change" === 1)
.drop("dtcode_change")
会给你:
+----------------+------+
|Currentdatedtime|dtcode|
+----------------+------+
|7-1-2016 0:00:17| 0|
|7-1-2016 0:02:17| 4|
|7-1-2016 0:04:17| 0|
|7-1-2016 0:07:17| 5|
+----------------+------+
我的数据看起来像:
我想获取每个出现时间最短的 dtcode 的第一条记录。
期望的输出:
我想通过使用 scala 获取。 请指导我构建逻辑。
谢谢, 赛亚姆.
--- 使用 Shaido 的正确评论进行编辑 ---
通常,GROUP BY
会处理这个,如果 table 中的顺序无关紧要,
SQL 适用于对大量相关数据进行分组。但是,您的分析取决于数据输入的顺序,并且更改是由一列中的更改触发的,该更改可以在以后重复并且不能聚合,而其他列可以继续更改。
在这种情况下,您需要循环遍历数据并手动检测更改,因为 SQL 没有简单的方法来对此类事物进行分组。我回答的有点太快了,没注意到。
这最好通过存储过程或显示语言来处理。如果你愿意,我可以给你 PHP 中的代码。
另一个作弊是添加 GROUP BY 使用的列(我们称之为 groubycheat
),每次 dtcode
更改时都会增加
SELECT MIN(Currentdatedtime) as Currentdatedtime, dtcode
FROM <tablename>
GROUP BY groupbycheat;
这仍然需要一个循环来添加字段,但是如果您需要多次获取结果,那么这是值得的。否则..否
我对您的问题进行了更多思考,并提出了一个更好的解决方案,即使用数据帧的 Window
功能。首先,所有内容都按 Currentdatedtime
排序,然后检查每一行以查看 dtcode
是否已更改。使用您的示例数据:
val spark = SparkSession.builder.getOrCreate()
import spark.implicits._
val df = Seq(("7-1-2016 0:00:17",0),("7-1-2016 0:01:17",0),
("7-1-2016 0:02:17",4),("7-1-2016 0:03:17",4),
("7-1-2016 0:04:17",0),("7-1-2016 0:05:17",0),
("7-1-2016 0:06:17",0),("7-1-2016 0:07:17",5)).toDF("Currentdatedtime", "dtcode")
val w = Window.orderBy("Currentdatedtime")
val df2 = df.withColumn("dtcode_change",
when(lag($"dtcode", 1).over(w) === $"dtcode", 0).
otherwise(1))
.filter($"dtcode_change" === 1)
.drop("dtcode_change")
会给你:
+----------------+------+
|Currentdatedtime|dtcode|
+----------------+------+
|7-1-2016 0:00:17| 0|
|7-1-2016 0:02:17| 4|
|7-1-2016 0:04:17| 0|
|7-1-2016 0:07:17| 5|
+----------------+------+