引导功能不能正常工作

Lead function is not working properly

我正在尝试使用 Spark 的 "lead" 函数,但在使用它时出现了奇怪的行为。

这是我的输入数据示例(按 "row_id" 和 "dt_capt" 排序):

    row_id,     dt_capt,                    dt_capt_time
_____________________________________________________________
1)  1-14ZBW-76, 2016-07-20 12:46:51.516005, 124651516005
2)  1-1BHPHNU,  2016-07-20 21:07:05.779006, 210705779006
3)  1-1BZ1F5B,  2016-07-20 21:07:05.779008, 210705779008
4)  1-1IE18-116,2016-07-20 09:48:52.411000, 94852411000
5)  1-1JEVXD,   2016-07-20 09:05:16.502001, 90516502001
6)  1-1JGTHE,   2016-07-20 09:04:24.183001, 90424183001
7)  1-1KQA6M8,  2016-07-20 21:06:02.483002, 210602483002
8)  1-1WI4W1P,  2016-07-20 09:04:06.163001, 90406163001
9)  1-1XIZRHX,  2016-07-20 00:00:27.646000, 27646000
10) 1-1Y932X9,  2016-07-20 16:47:51.774001, 164751774001
11) 1-1Y932X9,  2016-07-20 21:39:29.662002, 213929662002
12) 1-1YYW7-3,  2016-07-20 13:32:18.110004, 133218110004
13) 1-1YYW7-3,  2016-07-20 13:32:18.114001, 133218114001
14) 1-21CY-79,  2016-07-20 18:12:16.663003, 181216663003
15) 1-21CY-79,  2016-07-20 18:12:16.663008, 181216663008
16) 1-22BT-399, 2016-07-20 16:13:12.259003, 161312259003
17) 1-22BT-399, 2016-07-20 21:39:29.662006, 213929662006
18) 1-22BV-801, 2016-07-20 18:07:24.710001, 180724710001
19) 1-22BV-801, 2016-07-20 18:09:52.584005, 180952584005
20) 1-22BV-801, 2016-07-20 18:12:19.676002, 181219676002

我的代码:

#All the imports
from pyspark import SparkConf,SparkContext
from pyspark import HiveContext
from pyspark.sql.window import Window
from pyspark.sql.functions import lead,col,lit,udf
from pyspark.sql.types import *
import io 
import sys

conf = SparkConf().set('spark.kryoserializer.buffer.max', '512m')
sc=SparkContext(conf=conf)
hc=HiveContext(sc)

#Show only errors
sc.setLogLevel("ERROR")

#Get input data
delta = hc.sql("SELECT ROW_ID,DT_CAPT,UNIX_TIMESTAMP(DT_CAPT) as     DT_CAPT_TS,CAST(regexp_replace(split(LKR_GRC_S_CONTACT.dt_capt, ' ')[1], \"\:|    [.]\", '') AS BIGINT) AS dt_capt_time FROM int_lkr.lkr_grc_s_contact WHERE     regexp_replace(to_date(DT_CAPT), '-', '') == \"20160720\" ORDER BY     ROW_ID,DT_CAPT")
delta.registerTempTable("delta")

#Compute "mvt_suiv" with the lead function
w = Window().partitionBy("ROW_ID").orderBy("dt_capt_time")
delta2 = delta.select("row_id","dt_capt","dt_capt_time",lead("dt_capt_time").over(w).alias("mvt_suiv"))

结果输出:

    row_id,     dt_capt,                    dt_capt_time,   mvt_suiv
_____________________________________________________________________________
1)  1-14ZBW-76, 2016-07-20 12:46:51.516005, 124651516005,   NULL
2)  1-1BHPHNU,  2016-07-20 21:07:05.779006, 210705779006,   NULL
3)  1-1BZ1F5B,  2016-07-20 21:07:05.779008, 210705779008,   NULL
4)  1-1IE18-116,2016-07-20 09:48:52.411000, 94852411000,    NULL
5)  1-1JEVXD,   2016-07-20 09:05:16.502001, 90516502001,    171798691866
6)  1-1JGTHE,   2016-07-20 09:04:24.183001, 90424183001,    NULL
7)  1-1KQA6M8,  2016-07-20 21:06:02.483002, 210602483002,   NULL
8)  1-1WI4W1P,  2016-07-20 09:04:06.163001, 90406163001,    NULL
9)  1-1XIZRHX,  2016-07-20 00:00:27.646000, 27646000,       NULL
10) 1-1Y932X9,  2016-07-20 16:47:51.774001, 164751774001,   213929662002
11) 1-1Y932X9,  2016-07-20 21:39:29.662002, 213929662002,   NULL
12) 1-1YYW7-3,  2016-07-20 13:32:18.110004, 133218110004,   133218110004
13) 1-1YYW7-3,  2016-07-20 13:32:18.114001, 133218114001,   133218114001
14) 1-21CY-79,  2016-07-20 18:12:16.663003, 181216663003,   181216663008
15) 1-21CY-79,  2016-07-20 18:12:16.663008, 181216663008,   NULL
16) 1-22BT-399, 2016-07-20 16:13:12.259003, 161312259003,   213929662006
17) 1-22BT-399, 2016-07-20 21:39:29.662006, 213929662006,   NULL
18) 1-22BV-801, 2016-07-20 18:07:24.710001, 180724710001,   180952584005
19) 1-22BV-801, 2016-07-20 18:09:52.584005, 180952584005,   181219676002
20) 1-22BV-801, 2016-07-20 18:12:19.676002, 181219676002,   NULL

如您所见,它没有正常工作。 (第 5、12 和 13 行)。

第 5 行:应该是 "NULL" 因为没有下一行 ROW_ID

第 12 行:应该是 "133218114001" 而不是 "133218110004"

第 13 行:应该是 "NULL" 因为没有下一行 ROW_ID.

我做错了什么吗?我尝试使用字符串值和整数值,但我仍然对 "lead" 有奇怪的行为(与 "lag" 相同)。我感觉 Spark 中的 "window" 函数仍然包含很多错误(至少在 Spark 1.5 中)。有人可以证实吗?

Cloudera 版本:5.5.1

Spark 版本:1.5.0

谢谢!

我看不出你的代码有什么问题(也许我漏掉了),但重点是 1.5.0 的 window 函数实现有问题。检查这一堆问题,也许你遇到了其中一个(或副作用):

https://issues.apache.org/jira/browse/SPARK-11009

已在 CDH 5.5.4 中修复

https://www.cloudera.com/documentation/enterprise/release-notes/topics/cdh_rn_fixed_in_55.html