两个整数 returns None 与 PySpark 的区别?

Difference between two integers returns None with PySpark?

我正在尝试在我的 spark 数据中集成由滚动时间 window 定义的 "user session"。

我一直在使用这个问题:

与我不同的是我希望我的时间 window 大约是 5 个小时,所以我不能使用 datediff 会 returns 几天。

这是我的数据集:

[Row(auction_id_64=9999, datetime=datetime.datetime(2016, 12, 5, 3, 42, 17), user_id_64=123),
Row(auction_id_64=8888, datetime=datetime.datetime(2016, 12, 7, 3, 7, 23), user_id_64=123),
Row(auction_id_64=5555, datetime=datetime.datetime(2016, 12, 7, 3, 2, 7), user_id_64=123),
Row(auction_id_64=4444, datetime=datetime.datetime(2016, 12, 7, 3, 41, 49), user_id_64=456),
Row(auction_id_64=3333, datetime=datetime.datetime(2016, 12, 7, 3, 40, 54), user_id_64=456),
Row(auction_id_64=7777, datetime=datetime.datetime(2016, 12, 7, 18, 42, 17), user_id_64=456),
Row(auction_id_64=6666, datetime=datetime.datetime(2016, 12, 7, 3, 7, 23), user_id_64=789),
Row(auction_id_64=2222, datetime=datetime.datetime(2016, 12, 7, 3, 2, 7), user_id_64=789),
Row(auction_id_64=1111, datetime=datetime.datetime(2016, 12, 7, 3, 41, 49), user_id_64=789),
Row(auction_id_64=1212, datetime=datetime.datetime(2016, 12, 9, 3, 40, 54), user_id_64=789)]

我只需要添加一个列,该列将按用户为会话编制索引。 (比如auction_id9999是session 0,auction_id8888和auction_id5555是session 1(因为9999到8888有很多天,8888到5555只有几分钟)。我们从 0 开始为下一个用户建立索引。

这是我的代码:

# Add a timestamp (integer) column
df = df.withColumn('timestamp', unix_timestamp(df['datetime']).cast('integer'))

# We partition by user and order by timestamp
w = Window.partitionBy("user_id_64").orderBy("timestamp")

# we compute the absolute difference between timestamp and timestamp from the previous line. If no result, 0 is returned. 
diff = coalesce(abs("timestamp" - lag("timestamp", 1).over(w)), lit(0))

# If difference higher than 5 hours
indicator = (diff > 5 * 60 * 60).cast("integer")

# We increment for each indicator = 1
subgroup = sum(test).over(w).alias("session_index")

# We get everything
df = df.select("*", subgroup)

最后,每个人的session_index都是0。问题来自 diff = coalesce(abs("timestamp" - lag("timestamp", 1).over(w)), lit(0)) 行。在这里,每次返回的是 lit(0) (我通过更改 0 值进行检查)。所以我尝试通过更改几行来简化我的脚本:

test = "timestamp" - lag("timestamp", 1).over(w)
subgroup = sum(test).over(w).alias("session_index")

我删除了 coalesce 和 abs 函数。 session_index 是每行的 "None"。

如果我用 test = "timestamp" 替换测试,这会很好:我会得到时间戳的累加和。

如果我将它替换为 test = lag("timestamp", 1).over(w),它也会很好,我会为用户的第一行获得 None(因为没有上一行),然后是累计金额。

当我尝试减去我的两个整数时出现问题。但我不明白为什么?它是两个整数,结果也应该是一个整数,不是吗?

感谢您为我提供的任何帮助。

如果它是两个整数之间的差值会很奇怪,但事实并非如此。让我们再看看罪魁祸首:

coalesce(abs("timestamp" - lag("timestamp", 1).over(w)), lit(0))

减法的left-hand操作数是strstr 没有可以对 Column 进行操作的 __sub__,所以我们使用 right-hand 操作数的 __rsub__。一般来说,Column 的 dunder 方法将标准 Python 类型解释为文字。因此,您的代码实际上尝试从字符串 "timestamp" 中减去整数,结果未定义。

TL;DR 您应该使用 Column 作为 left-hand 操作数:

from pyspark.sql.functions import col

coalesce(abs(col("timestamp") - lag("timestamp", 1).over(w)), lit(0))