Spark Window 函数 rangeBetween 产生不正确的结果
Spark Window Function rangeBetween producing incorrect results
我正在尝试在 Long 类型的列上使用 RangeBetween 在 Spark DataFrame 上执行 window 函数,但 window 的结果不正确。我做错了什么吗?
这是我的 DataFrame:
val rowsRdd: RDD[Row] = spark.sparkContext.parallelize(
Seq(
Row("2014-11-01 08:10:10.12345", 141482941012345L),
Row("2014-11-01 09:10:10.12345", 141483301012345L),
Row("2014-11-01 10:10:10.12345", 141483661012345L),
Row("2014-11-02 10:10:10.12345", 141492301012345L),
Row("2014-11-03 10:10:10.12345", 141500941012345L),
Row("2014-11-04 10:10:10.12345", 141509581012345L),
Row("2014-11-05 10:10:10.12345", 141518221012345L),
Row("2014-11-06 10:10:10.12345", 141526861012345L),
Row("2014-11-07 10:10:10.12345", 141535501012345L),
Row("2014-11-08 10:10:10.12345", 141544141012345L)
)
)
val schema = new StructType()
.add(StructField("dateTime", StringType, true))
.add(StructField("unixTime", LongType, true))
val df = spark.createDataFrame(rowsRdd, schema)
df.show(10, false)
df.printSchema()
即:
+-------------------------+---------------+
|dateTime |unixTime |
+-------------------------+---------------+
|2014-11-01 08:10:10.12345|141482941012345|
|2014-11-01 09:10:10.12345|141483301012345|
|2014-11-01 10:10:10.12345|141483661012345|
|2014-11-02 10:10:10.12345|141492301012345|
|2014-11-03 10:10:10.12345|141500941012345|
|2014-11-04 10:10:10.12345|141509581012345|
|2014-11-05 10:10:10.12345|141518221012345|
|2014-11-06 10:10:10.12345|141526861012345|
|2014-11-07 10:10:10.12345|141535501012345|
|2014-11-08 10:10:10.12345|141544141012345|
+-------------------------+---------------+
架构:
root
|-- dateTime: string (nullable = true)
|-- unixTime: long (nullable = true)
第一列是一个事件的时间戳(字符串,实际中不会用到),第二列是时间戳对应的unix时间,单位为10e-5秒。
现在我想计算当前行的 window 中的事件数。例如 3 小时 window 我这样做:
val hour: Long = 60*60*100000L
val w = Window.orderBy(col("unixTime")).rangeBetween(-3*hour, 0)
val df2 = df.withColumn("cts", count(col("dateTime")).over(w)).orderBy(asc("unixTime"))
哪个 returns 正确:
+-------------------------+---------------+---+
|dateTime |unixTime |cts|
+-------------------------+---------------+---+
|2014-11-01 08:10:10.12345|141482941012345|1 |
|2014-11-01 09:10:10.12345|141483301012345|2 |
|2014-11-01 10:10:10.12345|141483661012345|3 |
|2014-11-02 10:10:10.12345|141492301012345|1 |
|2014-11-03 10:10:10.12345|141500941012345|1 |
|2014-11-04 10:10:10.12345|141509581012345|1 |
|2014-11-05 10:10:10.12345|141518221012345|1 |
|2014-11-06 10:10:10.12345|141526861012345|1 |
|2014-11-07 10:10:10.12345|141535501012345|1 |
|2014-11-08 10:10:10.12345|141544141012345|1 |
+-------------------------+---------------+---+
这是 window 6 小时的结果。为什么现在结果全为0?
val hour: Long = 60*60*100000L
val w = Window.orderBy(col("unixTime")).rangeBetween(-6*hour, 0)
val df2 = df.withColumn("cts", count(col("dateTime")).over(w)).orderBy(asc("unixTime"))
+-------------------------+---------------+---+
|dateTime |unixTime |cts|
+-------------------------+---------------+---+
|2014-11-01 08:10:10.12345|141482941012345|0 |
|2014-11-01 09:10:10.12345|141483301012345|0 |
|2014-11-01 10:10:10.12345|141483661012345|0 |
|2014-11-02 10:10:10.12345|141492301012345|0 |
|2014-11-03 10:10:10.12345|141500941012345|0 |
|2014-11-04 10:10:10.12345|141509581012345|0 |
|2014-11-05 10:10:10.12345|141518221012345|0 |
|2014-11-06 10:10:10.12345|141526861012345|0 |
|2014-11-07 10:10:10.12345|141535501012345|0 |
|2014-11-08 10:10:10.12345|141544141012345|0 |
+-------------------------+---------------+---+
这是 12 小时内发生的情况。为什么现在结果全是 1?
val hour: Long = 60*60*100000L
val w = Window.orderBy(col("unixTime")).rangeBetween(-12*hour, 0)
val df2 = df.withColumn("cts", count(col("dateTime")).over(w)).orderBy(asc("unixTime"))
+-------------------------+---------------+---+
|dateTime |unixTime |cts|
+-------------------------+---------------+---+
|2014-11-01 08:10:10.12345|141482941012345|1 |
|2014-11-01 09:10:10.12345|141483301012345|1 |
|2014-11-01 10:10:10.12345|141483661012345|1 |
|2014-11-02 10:10:10.12345|141492301012345|1 |
|2014-11-03 10:10:10.12345|141500941012345|1 |
|2014-11-04 10:10:10.12345|141509581012345|1 |
|2014-11-05 10:10:10.12345|141518221012345|1 |
|2014-11-06 10:10:10.12345|141526861012345|1 |
|2014-11-07 10:10:10.12345|141535501012345|1 |
|2014-11-08 10:10:10.12345|141544141012345|1 |
+-------------------------+---------------+---+
这是怎么回事?对于任何较大的 rangeBetween 值,它都无法正常工作。
编辑:2017 年 9 月 11 日
跟这个问题有关系吗?
[SPARK-19451][SQL] rangeBetween method should accept Long value as boundary #18540。它是否已在最新版本的 Spark 中实现?
确实与链接问题有关。 6 * hour
是 2160000000 大于 Integer.MAX_VALUE
(2147483647) 因此它导致整数溢出:
scala> (6 * hour).toInt
res4: Int = -2134967296
该问题已在当前 master 上修复,将在 Spark 2.3 中发布。
我正在尝试在 Long 类型的列上使用 RangeBetween 在 Spark DataFrame 上执行 window 函数,但 window 的结果不正确。我做错了什么吗?
这是我的 DataFrame:
val rowsRdd: RDD[Row] = spark.sparkContext.parallelize(
Seq(
Row("2014-11-01 08:10:10.12345", 141482941012345L),
Row("2014-11-01 09:10:10.12345", 141483301012345L),
Row("2014-11-01 10:10:10.12345", 141483661012345L),
Row("2014-11-02 10:10:10.12345", 141492301012345L),
Row("2014-11-03 10:10:10.12345", 141500941012345L),
Row("2014-11-04 10:10:10.12345", 141509581012345L),
Row("2014-11-05 10:10:10.12345", 141518221012345L),
Row("2014-11-06 10:10:10.12345", 141526861012345L),
Row("2014-11-07 10:10:10.12345", 141535501012345L),
Row("2014-11-08 10:10:10.12345", 141544141012345L)
)
)
val schema = new StructType()
.add(StructField("dateTime", StringType, true))
.add(StructField("unixTime", LongType, true))
val df = spark.createDataFrame(rowsRdd, schema)
df.show(10, false)
df.printSchema()
即:
+-------------------------+---------------+
|dateTime |unixTime |
+-------------------------+---------------+
|2014-11-01 08:10:10.12345|141482941012345|
|2014-11-01 09:10:10.12345|141483301012345|
|2014-11-01 10:10:10.12345|141483661012345|
|2014-11-02 10:10:10.12345|141492301012345|
|2014-11-03 10:10:10.12345|141500941012345|
|2014-11-04 10:10:10.12345|141509581012345|
|2014-11-05 10:10:10.12345|141518221012345|
|2014-11-06 10:10:10.12345|141526861012345|
|2014-11-07 10:10:10.12345|141535501012345|
|2014-11-08 10:10:10.12345|141544141012345|
+-------------------------+---------------+
架构:
root
|-- dateTime: string (nullable = true)
|-- unixTime: long (nullable = true)
第一列是一个事件的时间戳(字符串,实际中不会用到),第二列是时间戳对应的unix时间,单位为10e-5秒。
现在我想计算当前行的 window 中的事件数。例如 3 小时 window 我这样做:
val hour: Long = 60*60*100000L
val w = Window.orderBy(col("unixTime")).rangeBetween(-3*hour, 0)
val df2 = df.withColumn("cts", count(col("dateTime")).over(w)).orderBy(asc("unixTime"))
哪个 returns 正确:
+-------------------------+---------------+---+
|dateTime |unixTime |cts|
+-------------------------+---------------+---+
|2014-11-01 08:10:10.12345|141482941012345|1 |
|2014-11-01 09:10:10.12345|141483301012345|2 |
|2014-11-01 10:10:10.12345|141483661012345|3 |
|2014-11-02 10:10:10.12345|141492301012345|1 |
|2014-11-03 10:10:10.12345|141500941012345|1 |
|2014-11-04 10:10:10.12345|141509581012345|1 |
|2014-11-05 10:10:10.12345|141518221012345|1 |
|2014-11-06 10:10:10.12345|141526861012345|1 |
|2014-11-07 10:10:10.12345|141535501012345|1 |
|2014-11-08 10:10:10.12345|141544141012345|1 |
+-------------------------+---------------+---+
这是 window 6 小时的结果。为什么现在结果全为0?
val hour: Long = 60*60*100000L
val w = Window.orderBy(col("unixTime")).rangeBetween(-6*hour, 0)
val df2 = df.withColumn("cts", count(col("dateTime")).over(w)).orderBy(asc("unixTime"))
+-------------------------+---------------+---+
|dateTime |unixTime |cts|
+-------------------------+---------------+---+
|2014-11-01 08:10:10.12345|141482941012345|0 |
|2014-11-01 09:10:10.12345|141483301012345|0 |
|2014-11-01 10:10:10.12345|141483661012345|0 |
|2014-11-02 10:10:10.12345|141492301012345|0 |
|2014-11-03 10:10:10.12345|141500941012345|0 |
|2014-11-04 10:10:10.12345|141509581012345|0 |
|2014-11-05 10:10:10.12345|141518221012345|0 |
|2014-11-06 10:10:10.12345|141526861012345|0 |
|2014-11-07 10:10:10.12345|141535501012345|0 |
|2014-11-08 10:10:10.12345|141544141012345|0 |
+-------------------------+---------------+---+
这是 12 小时内发生的情况。为什么现在结果全是 1?
val hour: Long = 60*60*100000L
val w = Window.orderBy(col("unixTime")).rangeBetween(-12*hour, 0)
val df2 = df.withColumn("cts", count(col("dateTime")).over(w)).orderBy(asc("unixTime"))
+-------------------------+---------------+---+
|dateTime |unixTime |cts|
+-------------------------+---------------+---+
|2014-11-01 08:10:10.12345|141482941012345|1 |
|2014-11-01 09:10:10.12345|141483301012345|1 |
|2014-11-01 10:10:10.12345|141483661012345|1 |
|2014-11-02 10:10:10.12345|141492301012345|1 |
|2014-11-03 10:10:10.12345|141500941012345|1 |
|2014-11-04 10:10:10.12345|141509581012345|1 |
|2014-11-05 10:10:10.12345|141518221012345|1 |
|2014-11-06 10:10:10.12345|141526861012345|1 |
|2014-11-07 10:10:10.12345|141535501012345|1 |
|2014-11-08 10:10:10.12345|141544141012345|1 |
+-------------------------+---------------+---+
这是怎么回事?对于任何较大的 rangeBetween 值,它都无法正常工作。
编辑:2017 年 9 月 11 日
跟这个问题有关系吗? [SPARK-19451][SQL] rangeBetween method should accept Long value as boundary #18540。它是否已在最新版本的 Spark 中实现?
确实与链接问题有关。 6 * hour
是 2160000000 大于 Integer.MAX_VALUE
(2147483647) 因此它导致整数溢出:
scala> (6 * hour).toInt
res4: Int = -2134967296
该问题已在当前 master 上修复,将在 Spark 2.3 中发布。