两个整数 returns None 与 PySpark 的区别?
Difference between two integers returns None with PySpark?
我正在尝试在我的 spark 数据中集成由滚动时间 window 定义的 "user session"。
我一直在使用这个问题:
与我不同的是我希望我的时间 window 大约是 5 个小时,所以我不能使用 datediff 会 returns 几天。
这是我的数据集:
[Row(auction_id_64=9999, datetime=datetime.datetime(2016, 12, 5, 3, 42, 17), user_id_64=123),
Row(auction_id_64=8888, datetime=datetime.datetime(2016, 12, 7, 3, 7, 23), user_id_64=123),
Row(auction_id_64=5555, datetime=datetime.datetime(2016, 12, 7, 3, 2, 7), user_id_64=123),
Row(auction_id_64=4444, datetime=datetime.datetime(2016, 12, 7, 3, 41, 49), user_id_64=456),
Row(auction_id_64=3333, datetime=datetime.datetime(2016, 12, 7, 3, 40, 54), user_id_64=456),
Row(auction_id_64=7777, datetime=datetime.datetime(2016, 12, 7, 18, 42, 17), user_id_64=456),
Row(auction_id_64=6666, datetime=datetime.datetime(2016, 12, 7, 3, 7, 23), user_id_64=789),
Row(auction_id_64=2222, datetime=datetime.datetime(2016, 12, 7, 3, 2, 7), user_id_64=789),
Row(auction_id_64=1111, datetime=datetime.datetime(2016, 12, 7, 3, 41, 49), user_id_64=789),
Row(auction_id_64=1212, datetime=datetime.datetime(2016, 12, 9, 3, 40, 54), user_id_64=789)]
我只需要添加一个列,该列将按用户为会话编制索引。 (比如auction_id9999是session 0,auction_id8888和auction_id5555是session 1(因为9999到8888有很多天,8888到5555只有几分钟)。我们从 0 开始为下一个用户建立索引。
这是我的代码:
# Add a timestamp (integer) column
df = df.withColumn('timestamp', unix_timestamp(df['datetime']).cast('integer'))
# We partition by user and order by timestamp
w = Window.partitionBy("user_id_64").orderBy("timestamp")
# we compute the absolute difference between timestamp and timestamp from the previous line. If no result, 0 is returned.
diff = coalesce(abs("timestamp" - lag("timestamp", 1).over(w)), lit(0))
# If difference higher than 5 hours
indicator = (diff > 5 * 60 * 60).cast("integer")
# We increment for each indicator = 1
subgroup = sum(test).over(w).alias("session_index")
# We get everything
df = df.select("*", subgroup)
最后,每个人的session_index都是0。问题来自 diff = coalesce(abs("timestamp" - lag("timestamp", 1).over(w)), lit(0))
行。在这里,每次返回的是 lit(0) (我通过更改 0 值进行检查)。所以我尝试通过更改几行来简化我的脚本:
test = "timestamp" - lag("timestamp", 1).over(w)
subgroup = sum(test).over(w).alias("session_index")
我删除了 coalesce 和 abs 函数。 session_index 是每行的 "None"。
如果我用 test = "timestamp"
替换测试,这会很好:我会得到时间戳的累加和。
如果我将它替换为 test = lag("timestamp", 1).over(w)
,它也会很好,我会为用户的第一行获得 None(因为没有上一行),然后是累计金额。
当我尝试减去我的两个整数时出现问题。但我不明白为什么?它是两个整数,结果也应该是一个整数,不是吗?
感谢您为我提供的任何帮助。
如果它是两个整数之间的差值会很奇怪,但事实并非如此。让我们再看看罪魁祸首:
coalesce(abs("timestamp" - lag("timestamp", 1).over(w)), lit(0))
减法的left-hand操作数是str
。 str
没有可以对 Column
进行操作的 __sub__
,所以我们使用 right-hand 操作数的 __rsub__
。一般来说,Column
的 dunder 方法将标准 Python 类型解释为文字。因此,您的代码实际上尝试从字符串 "timestamp" 中减去整数,结果未定义。
TL;DR 您应该使用 Column
作为 left-hand 操作数:
from pyspark.sql.functions import col
coalesce(abs(col("timestamp") - lag("timestamp", 1).over(w)), lit(0))
我正在尝试在我的 spark 数据中集成由滚动时间 window 定义的 "user session"。
我一直在使用这个问题:
与我不同的是我希望我的时间 window 大约是 5 个小时,所以我不能使用 datediff 会 returns 几天。
这是我的数据集:
[Row(auction_id_64=9999, datetime=datetime.datetime(2016, 12, 5, 3, 42, 17), user_id_64=123),
Row(auction_id_64=8888, datetime=datetime.datetime(2016, 12, 7, 3, 7, 23), user_id_64=123),
Row(auction_id_64=5555, datetime=datetime.datetime(2016, 12, 7, 3, 2, 7), user_id_64=123),
Row(auction_id_64=4444, datetime=datetime.datetime(2016, 12, 7, 3, 41, 49), user_id_64=456),
Row(auction_id_64=3333, datetime=datetime.datetime(2016, 12, 7, 3, 40, 54), user_id_64=456),
Row(auction_id_64=7777, datetime=datetime.datetime(2016, 12, 7, 18, 42, 17), user_id_64=456),
Row(auction_id_64=6666, datetime=datetime.datetime(2016, 12, 7, 3, 7, 23), user_id_64=789),
Row(auction_id_64=2222, datetime=datetime.datetime(2016, 12, 7, 3, 2, 7), user_id_64=789),
Row(auction_id_64=1111, datetime=datetime.datetime(2016, 12, 7, 3, 41, 49), user_id_64=789),
Row(auction_id_64=1212, datetime=datetime.datetime(2016, 12, 9, 3, 40, 54), user_id_64=789)]
我只需要添加一个列,该列将按用户为会话编制索引。 (比如auction_id9999是session 0,auction_id8888和auction_id5555是session 1(因为9999到8888有很多天,8888到5555只有几分钟)。我们从 0 开始为下一个用户建立索引。
这是我的代码:
# Add a timestamp (integer) column
df = df.withColumn('timestamp', unix_timestamp(df['datetime']).cast('integer'))
# We partition by user and order by timestamp
w = Window.partitionBy("user_id_64").orderBy("timestamp")
# we compute the absolute difference between timestamp and timestamp from the previous line. If no result, 0 is returned.
diff = coalesce(abs("timestamp" - lag("timestamp", 1).over(w)), lit(0))
# If difference higher than 5 hours
indicator = (diff > 5 * 60 * 60).cast("integer")
# We increment for each indicator = 1
subgroup = sum(test).over(w).alias("session_index")
# We get everything
df = df.select("*", subgroup)
最后,每个人的session_index都是0。问题来自 diff = coalesce(abs("timestamp" - lag("timestamp", 1).over(w)), lit(0))
行。在这里,每次返回的是 lit(0) (我通过更改 0 值进行检查)。所以我尝试通过更改几行来简化我的脚本:
test = "timestamp" - lag("timestamp", 1).over(w)
subgroup = sum(test).over(w).alias("session_index")
我删除了 coalesce 和 abs 函数。 session_index 是每行的 "None"。
如果我用 test = "timestamp"
替换测试,这会很好:我会得到时间戳的累加和。
如果我将它替换为 test = lag("timestamp", 1).over(w)
,它也会很好,我会为用户的第一行获得 None(因为没有上一行),然后是累计金额。
当我尝试减去我的两个整数时出现问题。但我不明白为什么?它是两个整数,结果也应该是一个整数,不是吗?
感谢您为我提供的任何帮助。
如果它是两个整数之间的差值会很奇怪,但事实并非如此。让我们再看看罪魁祸首:
coalesce(abs("timestamp" - lag("timestamp", 1).over(w)), lit(0))
减法的left-hand操作数是str
。 str
没有可以对 Column
进行操作的 __sub__
,所以我们使用 right-hand 操作数的 __rsub__
。一般来说,Column
的 dunder 方法将标准 Python 类型解释为文字。因此,您的代码实际上尝试从字符串 "timestamp" 中减去整数,结果未定义。
TL;DR 您应该使用 Column
作为 left-hand 操作数:
from pyspark.sql.functions import col
coalesce(abs(col("timestamp") - lag("timestamp", 1).over(w)), lit(0))