Spark SQL 与过滤器连接的 DataFrame 无法正常工作
Spark SQL DataFrame join with filter is not working
我正在尝试通过基于某些列加入 df2 来过滤 df1,然后根据过滤器从 df1 中过滤一些行。
df1:
+---------------+----------+
| channel|rag_status|
+---------------+----------+
| STS| green|
|Rapid Cash Plus| green|
| DOTOPAL| green|
| RAPID CASH| green|
df2:
+---------------+----------+
| channel|rag_status|
+---------------+----------+
| STS| blue|
|Rapid Cash Plus| blue|
| DOTOPAL| blue|
+---------------+----------+
示例代码为:
df1.join(df2, df1.col("channel") === df2.col("channel"), "leftouter")
.filter(not(df1.col("rag_status") === "green"))
.select(df1.col("channel"), df1.col("rag_status")).show
它没有 return 任何记录。
我期望输出如下所示,这是根据 channel
和 green
状态条件过滤记录后从 df1 编辑的 return。如果 df2 中有相同的频道并且 df1 rag_status
为绿色,则从 df1 中删除该记录,并且 return 仅从 df1 中删除剩余记录。
预期输出为:
+---------------+----------+
| channel|rag_status|
+---------------+----------+
| RAPID CASH| green|
你可以这样工作:
val df1=sc.parallelize(Seq(("STS","green"),("Rapid Cash Plus","green"),("RAPID CASH","green"))).toDF("channel","rag_status").where($"rag_status"==="green")
val df2=sc.parallelize(Seq(("STS","blue"),("Rapid Cash Plus","blue"),("DOTOPAL","blue"))).toDF("channel","rag_status").withColumnRenamed("rag_status","rag_status2")
val leftJoinResult=df1.join(df2,Array("channel"),"left")
val innerJoinResult=df1.join(df2,"channel")
val resultDF=leftJoinResult.except(innerJoinResult).drop("rag_status2")
resultDF.show
Spark-shell输出:
scala> val df1=sc.parallelize(Seq(("STS","green"),("Rapid Cash Plus","green"),("RAPID CASH","green"))).toDF("channel","rag_status").where($"rag_status"==="green")
df1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [channel: string, rag_status: string]
scala> val df2=sc.parallelize(Seq(("STS","blue"),("Rapid Cash Plus","blue"),("DOTOPAL","blue"))).toDF("channel","rag_status").withColumnRenamed("rag_status","rag_status2")
df2: org.apache.spark.sql.DataFrame = [channel: string, rag_status2: string]
scala> val leftJoinResult=df1.join(df2,Array("channel"),"left")
leftJoinResult: org.apache.spark.sql.DataFrame = [channel: string, rag_status: string ... 1 more field]
scala> val innerJoinResult=df1.join(df2,"channel")
innerJoinResult: org.apache.spark.sql.DataFrame = [channel: string, rag_status: string ... 1 more field]
scala> val resultDF=leftJoinResult.except(innerJoinResult).drop("rag_status2")
resultDF: org.apache.spark.sql.DataFrame = [channel: string, rag_status: string]
scala> resultDF.show
+----------+----------+
| channel|rag_status|
+----------+----------+
|RAPID CASH| green|
+----------+----------+
您可以使用以下代码获得预期的输出:
df1.join(df2, Seq("channel"), "leftouter").filter(row => row(3) != "blue")
我正在尝试通过基于某些列加入 df2 来过滤 df1,然后根据过滤器从 df1 中过滤一些行。
df1:
+---------------+----------+
| channel|rag_status|
+---------------+----------+
| STS| green|
|Rapid Cash Plus| green|
| DOTOPAL| green|
| RAPID CASH| green|
df2:
+---------------+----------+
| channel|rag_status|
+---------------+----------+
| STS| blue|
|Rapid Cash Plus| blue|
| DOTOPAL| blue|
+---------------+----------+
示例代码为:
df1.join(df2, df1.col("channel") === df2.col("channel"), "leftouter")
.filter(not(df1.col("rag_status") === "green"))
.select(df1.col("channel"), df1.col("rag_status")).show
它没有 return 任何记录。
我期望输出如下所示,这是根据 channel
和 green
状态条件过滤记录后从 df1 编辑的 return。如果 df2 中有相同的频道并且 df1 rag_status
为绿色,则从 df1 中删除该记录,并且 return 仅从 df1 中删除剩余记录。
预期输出为:
+---------------+----------+
| channel|rag_status|
+---------------+----------+
| RAPID CASH| green|
你可以这样工作:
val df1=sc.parallelize(Seq(("STS","green"),("Rapid Cash Plus","green"),("RAPID CASH","green"))).toDF("channel","rag_status").where($"rag_status"==="green")
val df2=sc.parallelize(Seq(("STS","blue"),("Rapid Cash Plus","blue"),("DOTOPAL","blue"))).toDF("channel","rag_status").withColumnRenamed("rag_status","rag_status2")
val leftJoinResult=df1.join(df2,Array("channel"),"left")
val innerJoinResult=df1.join(df2,"channel")
val resultDF=leftJoinResult.except(innerJoinResult).drop("rag_status2")
resultDF.show
Spark-shell输出:
scala> val df1=sc.parallelize(Seq(("STS","green"),("Rapid Cash Plus","green"),("RAPID CASH","green"))).toDF("channel","rag_status").where($"rag_status"==="green")
df1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [channel: string, rag_status: string]
scala> val df2=sc.parallelize(Seq(("STS","blue"),("Rapid Cash Plus","blue"),("DOTOPAL","blue"))).toDF("channel","rag_status").withColumnRenamed("rag_status","rag_status2")
df2: org.apache.spark.sql.DataFrame = [channel: string, rag_status2: string]
scala> val leftJoinResult=df1.join(df2,Array("channel"),"left")
leftJoinResult: org.apache.spark.sql.DataFrame = [channel: string, rag_status: string ... 1 more field]
scala> val innerJoinResult=df1.join(df2,"channel")
innerJoinResult: org.apache.spark.sql.DataFrame = [channel: string, rag_status: string ... 1 more field]
scala> val resultDF=leftJoinResult.except(innerJoinResult).drop("rag_status2")
resultDF: org.apache.spark.sql.DataFrame = [channel: string, rag_status: string]
scala> resultDF.show
+----------+----------+
| channel|rag_status|
+----------+----------+
|RAPID CASH| green|
+----------+----------+
您可以使用以下代码获得预期的输出:
df1.join(df2, Seq("channel"), "leftouter").filter(row => row(3) != "blue")