如何根据 pyspark 中的条件设置新标志?
How to set new flag based on condition in pyspark?
我有两个如下所示的数据框。
df = spark.createDataFrame(sc.parallelize([[1,1,2],[1,2,9], [2,1,2],[2,2,1],
[4,1,5],[4,2,6]]), ["sid","cid","Cr"])
df.show()
+---+---+---+
|sid|cid| Cr|
+---+---+---+
| 1| 1| 2|
| 1| 2| 9|
| 2| 1| 2|
| 2| 2| 1|
| 4| 1| 5|
| 4| 2| 6|
| 5| 1| 3|
| 5| 2| 8|
+---+---+---+
接下来我创建了 df1,如下所示。
df1 = spark.createDataFrame(sc.parallelize([[1,1],[1,2],[1,3], [2,1],[2,2],[2,3],[4,1],[4,2],[4,3],[5,1],[5,2],[5,3]]), ["sid","cid"])
df1.show()
+---+---+
|sid|cid|
+---+---+
| 1| 1|
| 1| 2|
| 1| 3|
| 2| 1|
| 2| 2|
| 2| 3|
| 4| 1|
| 4| 2|
| 4| 3|
| 5| 1|
| 5| 2|
| 5| 3|
+---+---+
现在我希望我的最终输出应该如下所示,即。如果提供任何数据,即
if (df1.sid==df.sid)&(df1.cid==df.cid) then flag value 1 else 0.
缺少的 Cr 值将为“0”
+---+---+---+----+
|sid|cid| Cr|flag|
+---+---+---+----+
| 1| 1| 2| 1 |
| 1| 2| 9| 1 |
| 1| 3| 0| 0 |
| 2| 1| 2| 1 |
| 2| 2| 1| 1 |
| 2| 3| 0| 0 |
| 4| 1| 5| 1 |
| 4| 2| 6| 1 |
| 4| 3| 0| 0 |
| 5| 1| 3| 1 |
| 5| 2| 8| 1 |
| 5| 3| 0| 0 |
+---+---+---+----+
请帮我解决这个问题。
有数据:
from pyspark.sql.functions import col, when, lit, coalesce
df = spark.createDataFrame(
[(1, 1, 2), (1, 2, 9), (2, 1, 2), (2, 2, 1), (4, 1, 5), (4, 2, 6), (5, 1, 3), (5, 2, 8)],
("sid", "cid", "Cr"))
df1 = spark.createDataFrame(
[[1,1],[1,2],[1,3], [2,1],[2,2],[2,3],[4,1],[4,2],[4,3],[5,1],[5,2],[5,3]],
["sid","cid"])
外连接:
joined = (df.alias("df")
.join(
df1.alias("df1"),
(col("df.sid") == col("df1.sid")) & (col("df.cid") == col("df1.cid")),
"rightouter"))
和select
joined.select(
col("df1.*"),
coalesce(col("Cr"), lit(0)).alias("Cr"),
col("df.sid").isNotNull().cast("integer").alias("flag")
).orderBy("sid", "cid").show()
# +---+---+---+----+
# |sid|cid| Cr|flag|
# +---+---+---+----+
# | 1| 1| 2| 1|
# | 1| 2| 9| 1|
# | 1| 3| 0| 0|
# | 2| 1| 2| 1|
# | 2| 2| 1| 1|
# | 2| 3| 0| 0|
# | 4| 1| 5| 1|
# | 4| 2| 6| 1|
# | 4| 3| 0| 0|
# | 5| 1| 3| 1|
# | 5| 2| 8| 1|
# | 5| 3| 0| 0|
# +---+---+---+----+
我有两个如下所示的数据框。
df = spark.createDataFrame(sc.parallelize([[1,1,2],[1,2,9], [2,1,2],[2,2,1],
[4,1,5],[4,2,6]]), ["sid","cid","Cr"])
df.show()
+---+---+---+
|sid|cid| Cr|
+---+---+---+
| 1| 1| 2|
| 1| 2| 9|
| 2| 1| 2|
| 2| 2| 1|
| 4| 1| 5|
| 4| 2| 6|
| 5| 1| 3|
| 5| 2| 8|
+---+---+---+
接下来我创建了 df1,如下所示。
df1 = spark.createDataFrame(sc.parallelize([[1,1],[1,2],[1,3], [2,1],[2,2],[2,3],[4,1],[4,2],[4,3],[5,1],[5,2],[5,3]]), ["sid","cid"])
df1.show()
+---+---+
|sid|cid|
+---+---+
| 1| 1|
| 1| 2|
| 1| 3|
| 2| 1|
| 2| 2|
| 2| 3|
| 4| 1|
| 4| 2|
| 4| 3|
| 5| 1|
| 5| 2|
| 5| 3|
+---+---+
现在我希望我的最终输出应该如下所示,即。如果提供任何数据,即 if (df1.sid==df.sid)&(df1.cid==df.cid) then flag value 1 else 0. 缺少的 Cr 值将为“0”
+---+---+---+----+
|sid|cid| Cr|flag|
+---+---+---+----+
| 1| 1| 2| 1 |
| 1| 2| 9| 1 |
| 1| 3| 0| 0 |
| 2| 1| 2| 1 |
| 2| 2| 1| 1 |
| 2| 3| 0| 0 |
| 4| 1| 5| 1 |
| 4| 2| 6| 1 |
| 4| 3| 0| 0 |
| 5| 1| 3| 1 |
| 5| 2| 8| 1 |
| 5| 3| 0| 0 |
+---+---+---+----+
请帮我解决这个问题。
有数据:
from pyspark.sql.functions import col, when, lit, coalesce
df = spark.createDataFrame(
[(1, 1, 2), (1, 2, 9), (2, 1, 2), (2, 2, 1), (4, 1, 5), (4, 2, 6), (5, 1, 3), (5, 2, 8)],
("sid", "cid", "Cr"))
df1 = spark.createDataFrame(
[[1,1],[1,2],[1,3], [2,1],[2,2],[2,3],[4,1],[4,2],[4,3],[5,1],[5,2],[5,3]],
["sid","cid"])
外连接:
joined = (df.alias("df")
.join(
df1.alias("df1"),
(col("df.sid") == col("df1.sid")) & (col("df.cid") == col("df1.cid")),
"rightouter"))
和select
joined.select(
col("df1.*"),
coalesce(col("Cr"), lit(0)).alias("Cr"),
col("df.sid").isNotNull().cast("integer").alias("flag")
).orderBy("sid", "cid").show()
# +---+---+---+----+
# |sid|cid| Cr|flag|
# +---+---+---+----+
# | 1| 1| 2| 1|
# | 1| 2| 9| 1|
# | 1| 3| 0| 0|
# | 2| 1| 2| 1|
# | 2| 2| 1| 1|
# | 2| 3| 0| 0|
# | 4| 1| 5| 1|
# | 4| 2| 6| 1|
# | 4| 3| 0| 0|
# | 5| 1| 3| 1|
# | 5| 2| 8| 1|
# | 5| 3| 0| 0|
# +---+---+---+----+