pyspark sqlfunction expr 函数没有按预期工作?
pyspark sqlfunction expr function not working as expected?
pyspark sqlfunction expr 未按预期工作。
我的 test1.txt 包含
101|10|4
101|12|1
101|13|3
101|14|2
我的 test2.txt 包含
101|10|4
101|11|1
101|13|3
101|14|2
我使用上面的数据创建了两个数据框,如下面的代码。
df3 = spark.createDataFrame(sc.textFile("C://Users//cravi//Desktop//test1.txt").map( lambda x: x.split("|")[:3]),["cid","pid","pr"])
df4 = spark.createDataFrame(sc.textFile("C://Users//cravi//Desktop//test2.txt").map( lambda x: x.split("|")[:3]),["cid","pid","p"])
df5=df4.withColumnRenamed("p", "p")\
.join(df3.withColumnRenamed("pr", "Pr")\
, ["cid", "pid"], "outer")\
.na.fill(0)
tt=df5.withColumn('flag', sf.expr("case when p>0 and pr=='null' then 'N'\
when p=0 and Pr>0 then 'D'\
when p=Pr then 'R'\
else 'U' end"))
tt.show()
我得到如下输出
+---+---+----+----+----+
|cid|pid| p| Pr|flag|
+---+---+----+----+----+
|101| 14| 2| 2| R|
|101| 10| 4| 4| R|
|101| 11| 1|null| U|
|101| 12|null| 1| U|
|101| 13| 3| 3| R|
+---+---+----+----+----+
pyspark sqlfunction expr 未按预期工作。
如果 p
和 pr
相同,那么我的 falg
将是 'R'
。
如果 p
某个值并且 pr
为空,我的 flag
将是 'N'
如果 p
为空并且 pr
是某个值,我的标志是 'D'
其他情况我的标志是'U'
在这种情况下,预期输出是:
+---+---+----+----+----+
|cid|pid| p| Pr|flag|
+---+---+----+----+----+
|101| 14| 2| 2| R|
|101| 10| 4| 4| R|
|101| 11| 1|null| N|
|101| 12|null| 1| D|
|101| 13| 3| 3| R|
+---+---+----+----+----+
isNull
和 isNotNull
内置函数 应该可以解决您的问题,可以在查询中使用
tt=df5.withColumn('flag', sf.expr("case when isNotNull(`p`) and isNull(`pr`) then 'N'\
when isNull(`p`) and isNotNull(`Pr`) then 'D'\
when p=Pr then 'R'\
else 'U' end"))
因此你应该得到
+---+---+----+----+----+
|cid|pid| p| Pr|flag|
+---+---+----+----+----+
|101| 14| 2| 2| R|
|101| 10| 4| 4| R|
|101| 11| 1|null| N|
|101| 12|null| 1| D|
|101| 13| 3| 3| R|
+---+---+----+----+----+
注意: na.fill(0)
没有用,因为它没有应用,因为列是 StringType()
希望回答对你有帮助
pyspark sqlfunction expr 未按预期工作。
我的 test1.txt 包含
101|10|4
101|12|1
101|13|3
101|14|2
我的 test2.txt 包含
101|10|4
101|11|1
101|13|3
101|14|2
我使用上面的数据创建了两个数据框,如下面的代码。
df3 = spark.createDataFrame(sc.textFile("C://Users//cravi//Desktop//test1.txt").map( lambda x: x.split("|")[:3]),["cid","pid","pr"])
df4 = spark.createDataFrame(sc.textFile("C://Users//cravi//Desktop//test2.txt").map( lambda x: x.split("|")[:3]),["cid","pid","p"])
df5=df4.withColumnRenamed("p", "p")\
.join(df3.withColumnRenamed("pr", "Pr")\
, ["cid", "pid"], "outer")\
.na.fill(0)
tt=df5.withColumn('flag', sf.expr("case when p>0 and pr=='null' then 'N'\
when p=0 and Pr>0 then 'D'\
when p=Pr then 'R'\
else 'U' end"))
tt.show()
我得到如下输出
+---+---+----+----+----+
|cid|pid| p| Pr|flag|
+---+---+----+----+----+
|101| 14| 2| 2| R|
|101| 10| 4| 4| R|
|101| 11| 1|null| U|
|101| 12|null| 1| U|
|101| 13| 3| 3| R|
+---+---+----+----+----+
pyspark sqlfunction expr 未按预期工作。
如果 p
和 pr
相同,那么我的 falg
将是 'R'
。
如果 p
某个值并且 pr
为空,我的 flag
将是 'N'
如果 p
为空并且 pr
是某个值,我的标志是 'D'
其他情况我的标志是'U'
在这种情况下,预期输出是:
+---+---+----+----+----+
|cid|pid| p| Pr|flag|
+---+---+----+----+----+
|101| 14| 2| 2| R|
|101| 10| 4| 4| R|
|101| 11| 1|null| N|
|101| 12|null| 1| D|
|101| 13| 3| 3| R|
+---+---+----+----+----+
isNull
和 isNotNull
内置函数 应该可以解决您的问题,可以在查询中使用
tt=df5.withColumn('flag', sf.expr("case when isNotNull(`p`) and isNull(`pr`) then 'N'\
when isNull(`p`) and isNotNull(`Pr`) then 'D'\
when p=Pr then 'R'\
else 'U' end"))
因此你应该得到
+---+---+----+----+----+
|cid|pid| p| Pr|flag|
+---+---+----+----+----+
|101| 14| 2| 2| R|
|101| 10| 4| 4| R|
|101| 11| 1|null| N|
|101| 12|null| 1| D|
|101| 13| 3| 3| R|
+---+---+----+----+----+
注意: na.fill(0)
没有用,因为它没有应用,因为列是 StringType()
希望回答对你有帮助