如何在 Spark SQL 查询中定义 WINDOWING 函数以避免重复代码
How to define WINDOWING function in Spark SQL query to avoid repetitive code
我有一个有很多超前和滞后的查询,因此重复了代码分区。
如果我使用 Scala 代码,我可以定义 window 规范并重用它,那么有没有一种方法可以在 Spark SQL.
中重用分区代码
Objective是为了避免"over ( partition by sessionId, deviceId order by entry_datetime ) "
的重复
SELECT * ,
lag( channel,1,null ) over ( partition by sessionId, deviceId order by entry_datetime ) as prev_chnl,
lead( channel,1,null ) over ( partition by sessionId, deviceId order by entry_datetime ) as next_chnl,
lag( channel-source,1,null ) over ( partition by sessionId, deviceId order by entry_datetime ) as prev_chnl_source,
lead( channel-source,1,null ) over ( partition by sessionId, deviceId order by entry_datetime ) as next_chnl_source,
FROM RAW_VIEW
RAW_VIEW
+------------+-----------+---------------------+---------+-----------------+
|sessionId |deviceId |entry_datetime |channel |channel-source |
+------------+-----------+---------------------+---------+-----------------+
|SESSION-ID-1|DEVICE-ID-1|2018-04-09 15:00:00.0|001 |Internet |
|SESSION-ID-1|DEVICE-ID-1|2018-04-09 16:00:00.0|002 |Cable |
|SESSION-ID-1|DEVICE-ID-1|2018-04-09 17:00:00.0|003 |Satellite |
+------------+-----------+---------------------+---------+-----------------+
最终视图
+------------+-----------+---------------------+---------+-----------------+---------+---------+-----------------+-----------------+
|sessionId |deviceId |entry_datetime |channel |channel-source |prev_chnl|next_chnl|prev_chnl_source |next_chnl_source |
+------------+-----------+---------------------+---------+-----------------+---------+---------+-----------------+-----------------+
|SESSION-ID-1|DEVICE-ID-1|2018-04-09 15:00:00.0|001 |Internet |null |002 |null |Cable |
|SESSION-ID-1|DEVICE-ID-1|2018-04-09 15:01:00.0|002 |Cable |001 |003 |Internet |Satellite |
|SESSION-ID-1|DEVICE-ID-1|2018-04-09 15:02:00.0|003 |Satellite |002 |null |Cable |null |
+------------+-----------+---------------------+---------+-----------------+---------+---------+-----------------+-----------------+
如果您想在 spark-sql 中执行此操作,一种方法是将 row_number()
添加到您订购的分区上的 table。然后通过向 row_number 减去/加 1 来创建此 table 的滞后和领先版本。最后对当前 table 与之前和之后的版本以及 select 相应的列进行 LEFT JOIN
。
例如,尝试以下操作:
SELECT curr.*,
prev.channel AS prev_chnl,
next.channel AS next_chnl,
prev.channel_source AS prev_chnl_source,
next.channel_source AS next_chnl_source
FROM (SELECT *,
ROW_NUMBER() OVER (partition by sessionId,
deviceId
order by entry_datetime) AS row_num
FROM RAW_VIEW
) curr
LEFT JOIN (SELECT *,
ROW_NUMBER() OVER (partition by sessionId,
deviceId
order by entry_datetime) + 1 AS row_num
FROM RAW_VIEW
) prev ON (curr.row_num = prev.row_num)
LEFT JOIN (SELECT *,
ROW_NUMBER() OVER (partition by sessionId,
deviceId
order by entry_datetime) - 1 AS row_num
FROM RAW_VIEW
) next ON (next.row_num = curr.row_num)
ORDER BY entry_datetime
这导致:
+------------+-----------+---------------------+-------+--------------+-------+---------+---------+----------------+----------------+
|sessionId |deviceId |entry_datetime |channel|channel_source|row_num|prev_chnl|next_chnl|prev_chnl_source|next_chnl_source|
+------------+-----------+---------------------+-------+--------------+-------+---------+---------+----------------+----------------+
|SESSION-ID-1|DEVICE-ID-1|2018-04-09 15:00:00.0|001 |Internet |1 |null |002 |null |Cable |
|SESSION-ID-1|DEVICE-ID-1|2018-04-09 16:00:00.0|002 |Cable |2 |001 |003 |Internet |Satellite |
|SESSION-ID-1|DEVICE-ID-1|2018-04-09 17:00:00.0|003 |Satellite |3 |002 |null |Cable |null |
+------------+-----------+---------------------+-------+--------------+-------+---------+---------+----------------+----------------+
您应该能够定义命名 window 并在查询中引用它:
SELECT * ,
lag(channel, 1) OVER w AS prev_chnl,
lead(channel, 1) OVER w AS next_chnl,
lag(channel-source, 1) OVER w AS prev_chnl_source,
lead(channel-source, 1) OVER w AS next_chnl_source,
FROM raw_view
WINDOW w AS (PARTITION BY sessionId, deviceId ORDER BY entry_datetime)
但看起来此功能目前已损坏。
我有一个有很多超前和滞后的查询,因此重复了代码分区。
如果我使用 Scala 代码,我可以定义 window 规范并重用它,那么有没有一种方法可以在 Spark SQL.
中重用分区代码Objective是为了避免"over ( partition by sessionId, deviceId order by entry_datetime ) "
的重复SELECT * ,
lag( channel,1,null ) over ( partition by sessionId, deviceId order by entry_datetime ) as prev_chnl,
lead( channel,1,null ) over ( partition by sessionId, deviceId order by entry_datetime ) as next_chnl,
lag( channel-source,1,null ) over ( partition by sessionId, deviceId order by entry_datetime ) as prev_chnl_source,
lead( channel-source,1,null ) over ( partition by sessionId, deviceId order by entry_datetime ) as next_chnl_source,
FROM RAW_VIEW
RAW_VIEW
+------------+-----------+---------------------+---------+-----------------+
|sessionId |deviceId |entry_datetime |channel |channel-source |
+------------+-----------+---------------------+---------+-----------------+
|SESSION-ID-1|DEVICE-ID-1|2018-04-09 15:00:00.0|001 |Internet |
|SESSION-ID-1|DEVICE-ID-1|2018-04-09 16:00:00.0|002 |Cable |
|SESSION-ID-1|DEVICE-ID-1|2018-04-09 17:00:00.0|003 |Satellite |
+------------+-----------+---------------------+---------+-----------------+
最终视图
+------------+-----------+---------------------+---------+-----------------+---------+---------+-----------------+-----------------+
|sessionId |deviceId |entry_datetime |channel |channel-source |prev_chnl|next_chnl|prev_chnl_source |next_chnl_source |
+------------+-----------+---------------------+---------+-----------------+---------+---------+-----------------+-----------------+
|SESSION-ID-1|DEVICE-ID-1|2018-04-09 15:00:00.0|001 |Internet |null |002 |null |Cable |
|SESSION-ID-1|DEVICE-ID-1|2018-04-09 15:01:00.0|002 |Cable |001 |003 |Internet |Satellite |
|SESSION-ID-1|DEVICE-ID-1|2018-04-09 15:02:00.0|003 |Satellite |002 |null |Cable |null |
+------------+-----------+---------------------+---------+-----------------+---------+---------+-----------------+-----------------+
如果您想在 spark-sql 中执行此操作,一种方法是将 row_number()
添加到您订购的分区上的 table。然后通过向 row_number 减去/加 1 来创建此 table 的滞后和领先版本。最后对当前 table 与之前和之后的版本以及 select 相应的列进行 LEFT JOIN
。
例如,尝试以下操作:
SELECT curr.*,
prev.channel AS prev_chnl,
next.channel AS next_chnl,
prev.channel_source AS prev_chnl_source,
next.channel_source AS next_chnl_source
FROM (SELECT *,
ROW_NUMBER() OVER (partition by sessionId,
deviceId
order by entry_datetime) AS row_num
FROM RAW_VIEW
) curr
LEFT JOIN (SELECT *,
ROW_NUMBER() OVER (partition by sessionId,
deviceId
order by entry_datetime) + 1 AS row_num
FROM RAW_VIEW
) prev ON (curr.row_num = prev.row_num)
LEFT JOIN (SELECT *,
ROW_NUMBER() OVER (partition by sessionId,
deviceId
order by entry_datetime) - 1 AS row_num
FROM RAW_VIEW
) next ON (next.row_num = curr.row_num)
ORDER BY entry_datetime
这导致:
+------------+-----------+---------------------+-------+--------------+-------+---------+---------+----------------+----------------+
|sessionId |deviceId |entry_datetime |channel|channel_source|row_num|prev_chnl|next_chnl|prev_chnl_source|next_chnl_source|
+------------+-----------+---------------------+-------+--------------+-------+---------+---------+----------------+----------------+
|SESSION-ID-1|DEVICE-ID-1|2018-04-09 15:00:00.0|001 |Internet |1 |null |002 |null |Cable |
|SESSION-ID-1|DEVICE-ID-1|2018-04-09 16:00:00.0|002 |Cable |2 |001 |003 |Internet |Satellite |
|SESSION-ID-1|DEVICE-ID-1|2018-04-09 17:00:00.0|003 |Satellite |3 |002 |null |Cable |null |
+------------+-----------+---------------------+-------+--------------+-------+---------+---------+----------------+----------------+
您应该能够定义命名 window 并在查询中引用它:
SELECT * ,
lag(channel, 1) OVER w AS prev_chnl,
lead(channel, 1) OVER w AS next_chnl,
lag(channel-source, 1) OVER w AS prev_chnl_source,
lead(channel-source, 1) OVER w AS next_chnl_source,
FROM raw_view
WINDOW w AS (PARTITION BY sessionId, deviceId ORDER BY entry_datetime)
但看起来此功能目前已损坏。