spark window 函数有条件重启
spark window function conditional restart
我正在尝试计算并非源自额外信用的 activity 值。
输入:
+------+--------+------+
|period|activity|credit|
+------+--------+------+
| 1| 5| 0|
| 2| 0| 3|
| 3| 4| 0|
| 4| 0| 3|
| 5| 1| 0|
| 6| 1| 0|
| 7| 5| 0|
| 8| 0| 1|
| 9| 0| 1|
| 10| 5| 0|
+------+--------+------+
输出:
rdd = sc.parallelize([(5,0,5),(0,3,0),(4,0,1),(0,3,0),(1,0,0),(1,0,0),(5,0,4),(0,1,0),(0,1,0),(5,0,3)])
df = rdd.toDF(["activity","credit","realActivity"])
+------+--------+------+------------+
|period|activity|credit|realActivity|
+------+--------+------+------------+
| 1| 5| 0| 5|
| 2| 0| 3| 0|
| 3| 4| 0| 1|
| 4| 0| 3| 0|
| 5| 1| 0| 0|
| 6| 1| 0| 0|
| 7| 5| 0| 4|
| 8| 0| 1| 0|
| 9| 0| 1| 0|
| 10| 5| 0| 3|
+------+--------+------+------------+
我试图创建一个根据行类型进行加减的贷方余额列,但我无法根据自身有条件地(每次低于零)重新启动它。它看起来像一个递归问题,我不确定如何转换为 pyspark 友好。显然,我不能执行以下操作,自我引用以前的值..
w = Window.orderBy("period")
df = df.withColumn("realActivity", lag("realActivity",1,0).over(w) - lag("credit", 1, 0).over(w) - lag("activity",1,0).over(w) )
更新:
正如所指出的那样,window 计算是不可能的。因此,我想执行类似于下面的代码片段的操作来计算 creditBalance,这样我就可以计算 realActivity。
df['creditBalance']=0
for i in range(1, len(df)):
if (df.loc[i-1, 'creditBalance']) > 0:
df.loc[i, 'creditBalance'] = df.loc[i-1, 'creditBalance'] + df.loc[i, 'credit'] - df.loc[i, 'activity']
elif df.loc[i, 'creditamount'] > 0:
df.loc[i, 'creditBalance'] = df.loc[i, 'credit'] - df.loc[i, 'activity']
现在,我唯一的问题是:如何将此 "local" 函数应用于 spark 数据帧中的每个组?
- 按组将数据帧写入文件并在本地处理?
- 自定义映射并收集本地执行的行?
- 按组将行折叠成一行并处理?
- 还有什么吗?
@潘森,
我用下面的代码解决了这个问题。如果您正在尝试解决类似的问题,它可能会有用。
def creditUsage(rows):
'''
Input:
timestamp, activity, credit
['1;5;0', '2;0;3', '3;4;0', '4;0;3', '5;1;0', '6;1;0', '7;5;0', '8;0;1', '9;0;1', '10;5;0']
Output:
[timestamp; creditUsage]
'''
timestamps = [int(r.split(";")[0]) for r in rows]
rows = [r for _,r in sorted(zip(timestamps,rows))]
print(rows)
timestamp, trActivity, credit = zip(*[(int(ts), float(act), float(rbonus)) for r in rows for [ts, act, rbonus] in [r.split(";")]])
creditBalance,creditUsage = [0.0] * len(credit), [0.0] * len(credit)
for i in range(0, len(trActivity)):
creditBalance[i] = creditBalance[i-1]+credit[i]
""" if bonusBalance greater than activity then actitivity is the usage, if not, than bonusBalance """
creditUsage[i] = creditBalance[i] if creditBalance[i] - trActivity[i] <0 else trActivity[i]
creditBalance[i] += (- creditUsage[i])
output = ["{0};{1:02}".format(t_, r_) for t_, r_ in zip(timestamp, creditUsage)]
return(output)
realBonusUDF = udf(creditUsage,ArrayType(StringType()))
a= df.withColumn('data', concat_ws(';', col('period'), col('activity'), col('credit'))) \
.groupBy('userID').agg(collect_list('data').alias('data')) \
.withColumn('data', realBonusUDF('data')) \
.withColumn("data", explode("data")) \
.withColumn("data", split("data", ";")) \
.withColumn("timestamp", col('data')[0].cast("int")) \
.withColumn("creditUsage", col('data')[1].cast("float")) \
.drop('data')
输出:
+------+---------+-----------+
|userID|timestamp|creditUsage|
+------+---------+-----------+
| 123| 1| 0.0|
| 123| 2| 0.0|
| 123| 3| 3.0|
| 123| 4| 0.0|
| 123| 5| 1.0|
| 123| 6| 1.0|
| 123| 7| 1.0|
| 123| 8| 0.0|
| 123| 9| 0.0|
| 123| 10| 2.0|
+------+---------+-----------+
我正在尝试计算并非源自额外信用的 activity 值。
输入:
+------+--------+------+
|period|activity|credit|
+------+--------+------+
| 1| 5| 0|
| 2| 0| 3|
| 3| 4| 0|
| 4| 0| 3|
| 5| 1| 0|
| 6| 1| 0|
| 7| 5| 0|
| 8| 0| 1|
| 9| 0| 1|
| 10| 5| 0|
+------+--------+------+
输出:
rdd = sc.parallelize([(5,0,5),(0,3,0),(4,0,1),(0,3,0),(1,0,0),(1,0,0),(5,0,4),(0,1,0),(0,1,0),(5,0,3)])
df = rdd.toDF(["activity","credit","realActivity"])
+------+--------+------+------------+
|period|activity|credit|realActivity|
+------+--------+------+------------+
| 1| 5| 0| 5|
| 2| 0| 3| 0|
| 3| 4| 0| 1|
| 4| 0| 3| 0|
| 5| 1| 0| 0|
| 6| 1| 0| 0|
| 7| 5| 0| 4|
| 8| 0| 1| 0|
| 9| 0| 1| 0|
| 10| 5| 0| 3|
+------+--------+------+------------+
我试图创建一个根据行类型进行加减的贷方余额列,但我无法根据自身有条件地(每次低于零)重新启动它。它看起来像一个递归问题,我不确定如何转换为 pyspark 友好。显然,我不能执行以下操作,自我引用以前的值..
w = Window.orderBy("period")
df = df.withColumn("realActivity", lag("realActivity",1,0).over(w) - lag("credit", 1, 0).over(w) - lag("activity",1,0).over(w) )
更新: 正如所指出的那样,window 计算是不可能的。因此,我想执行类似于下面的代码片段的操作来计算 creditBalance,这样我就可以计算 realActivity。
df['creditBalance']=0
for i in range(1, len(df)):
if (df.loc[i-1, 'creditBalance']) > 0:
df.loc[i, 'creditBalance'] = df.loc[i-1, 'creditBalance'] + df.loc[i, 'credit'] - df.loc[i, 'activity']
elif df.loc[i, 'creditamount'] > 0:
df.loc[i, 'creditBalance'] = df.loc[i, 'credit'] - df.loc[i, 'activity']
现在,我唯一的问题是:如何将此 "local" 函数应用于 spark 数据帧中的每个组?
- 按组将数据帧写入文件并在本地处理?
- 自定义映射并收集本地执行的行?
- 按组将行折叠成一行并处理?
- 还有什么吗?
@潘森, 我用下面的代码解决了这个问题。如果您正在尝试解决类似的问题,它可能会有用。
def creditUsage(rows):
'''
Input:
timestamp, activity, credit
['1;5;0', '2;0;3', '3;4;0', '4;0;3', '5;1;0', '6;1;0', '7;5;0', '8;0;1', '9;0;1', '10;5;0']
Output:
[timestamp; creditUsage]
'''
timestamps = [int(r.split(";")[0]) for r in rows]
rows = [r for _,r in sorted(zip(timestamps,rows))]
print(rows)
timestamp, trActivity, credit = zip(*[(int(ts), float(act), float(rbonus)) for r in rows for [ts, act, rbonus] in [r.split(";")]])
creditBalance,creditUsage = [0.0] * len(credit), [0.0] * len(credit)
for i in range(0, len(trActivity)):
creditBalance[i] = creditBalance[i-1]+credit[i]
""" if bonusBalance greater than activity then actitivity is the usage, if not, than bonusBalance """
creditUsage[i] = creditBalance[i] if creditBalance[i] - trActivity[i] <0 else trActivity[i]
creditBalance[i] += (- creditUsage[i])
output = ["{0};{1:02}".format(t_, r_) for t_, r_ in zip(timestamp, creditUsage)]
return(output)
realBonusUDF = udf(creditUsage,ArrayType(StringType()))
a= df.withColumn('data', concat_ws(';', col('period'), col('activity'), col('credit'))) \
.groupBy('userID').agg(collect_list('data').alias('data')) \
.withColumn('data', realBonusUDF('data')) \
.withColumn("data", explode("data")) \
.withColumn("data", split("data", ";")) \
.withColumn("timestamp", col('data')[0].cast("int")) \
.withColumn("creditUsage", col('data')[1].cast("float")) \
.drop('data')
输出:
+------+---------+-----------+
|userID|timestamp|creditUsage|
+------+---------+-----------+
| 123| 1| 0.0|
| 123| 2| 0.0|
| 123| 3| 3.0|
| 123| 4| 0.0|
| 123| 5| 1.0|
| 123| 6| 1.0|
| 123| 7| 1.0|
| 123| 8| 0.0|
| 123| 9| 0.0|
| 123| 10| 2.0|
+------+---------+-----------+