Spark SQL 与过滤器连接的 DataFrame 无法正常工作

Spark SQL DataFrame join with filter is not working

我正在尝试通过基于某些列加入 df2 来过滤 df1,然后根据过滤器从 df1 中过滤一些行。

df1:

+---------------+----------+
|        channel|rag_status|
+---------------+----------+
|            STS|     green|
|Rapid Cash Plus|     green|
|        DOTOPAL|     green|
|     RAPID CASH|     green|

df2:

+---------------+----------+
|        channel|rag_status|
+---------------+----------+
|            STS|      blue|
|Rapid Cash Plus|      blue|
|        DOTOPAL|      blue|
+---------------+----------+

示例代码为:

df1.join(df2, df1.col("channel") === df2.col("channel"), "leftouter")
      .filter(not(df1.col("rag_status") === "green"))
      .select(df1.col("channel"), df1.col("rag_status")).show

它没有 return 任何记录。

我期望输出如下所示,这是根据 channelgreen 状态条件过滤记录后从 df1 编辑的 return。如果 df2 中有相同的频道并且 df1 rag_status 为绿色,则从 df1 中删除该记录,并且 return 仅从 df1 中删除剩余记录。

预期输出为:

+---------------+----------+
|        channel|rag_status|
+---------------+----------+
|     RAPID CASH|     green|

你可以这样工作:

val df1=sc.parallelize(Seq(("STS","green"),("Rapid Cash Plus","green"),("RAPID CASH","green"))).toDF("channel","rag_status").where($"rag_status"==="green")
val df2=sc.parallelize(Seq(("STS","blue"),("Rapid Cash Plus","blue"),("DOTOPAL","blue"))).toDF("channel","rag_status").withColumnRenamed("rag_status","rag_status2")
val leftJoinResult=df1.join(df2,Array("channel"),"left")
val innerJoinResult=df1.join(df2,"channel")
val resultDF=leftJoinResult.except(innerJoinResult).drop("rag_status2")
resultDF.show

Spark-shell输出:

scala> val df1=sc.parallelize(Seq(("STS","green"),("Rapid Cash Plus","green"),("RAPID CASH","green"))).toDF("channel","rag_status").where($"rag_status"==="green")
df1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [channel: string, rag_status: string]

scala> val df2=sc.parallelize(Seq(("STS","blue"),("Rapid Cash Plus","blue"),("DOTOPAL","blue"))).toDF("channel","rag_status").withColumnRenamed("rag_status","rag_status2")
df2: org.apache.spark.sql.DataFrame = [channel: string, rag_status2: string]

scala> val leftJoinResult=df1.join(df2,Array("channel"),"left")
leftJoinResult: org.apache.spark.sql.DataFrame = [channel: string, rag_status: string ... 1 more field]

scala> val innerJoinResult=df1.join(df2,"channel")
innerJoinResult: org.apache.spark.sql.DataFrame = [channel: string, rag_status: string ... 1 more field]

scala> val resultDF=leftJoinResult.except(innerJoinResult).drop("rag_status2")
resultDF: org.apache.spark.sql.DataFrame = [channel: string, rag_status: string]

scala> resultDF.show
+----------+----------+                                                         
|   channel|rag_status|
+----------+----------+
|RAPID CASH|     green|
+----------+----------+

您可以使用以下代码获得预期的输出:

df1.join(df2, Seq("channel"), "leftouter").filter(row => row(3) != "blue")