为什么 left_anti join 在 pyspark 中没有按预期工作?
why left_anti join doesn't work as expected in pyspark?
在数据框中,我试图识别那些在 C2 列中具有值但在任何其他行的 C1 列中不存在的值的行。我尝试了以下代码:
in_df = sqlContext.createDataFrame([[1,None,'A'],[2,1,'B'],[3,None,'C'],[4,11,'D']],['C1','C2','C3'])
in_df.show()
+---+----+---+
| C1| C2| C3|
+---+----+---+
| 1|null| A|
| 2| 1| B|
| 3|null| C|
| 4| 11| D|
+---+----+---+
filtered = in_df.filter(in_df.C2.isNotNull())
filtered.show()
+---+---+---+
| C1| C2| C3|
+---+---+---+
| 2| 1| B|
| 4| 11| D|
+---+---+---+
现在应用 left_anti 连接预计 return 只有第 4 行,但是我也得到第 2 行:
filtered.join(in_df,(in_df.C1 == filtered.C2), 'left_anti').show()
+---+---+---+
| C1| C2| C3|
+---+---+---+
| 2| 1| B|
| 4| 11| D|
+---+---+---+
如果我 'materialize' 过滤后的 DF 结果如预期:
filtered = filtered.toDF(*filtered.columns)
filtered.join(in_df,(in_df.C1 == filtered.C2), 'left_anti').show()
+---+---+---+
| C1| C2| C3|
+---+---+---+
| 4| 11| D|
+---+---+---+
为什么需要这个 .toDF?
in_df.C1
实际上是指 filtered
列,如以下代码所示:
in_df = sqlContext.createDataFrame([[1,None,'A'],[2,1,'B'],[3,None,'C'],[4,11,'D']],['C1','C2','C3'])
filtered = in_df.filter(in_df.C2.isNotNull()).select("C2")
filtered.join(in_df,(in_df.C1 == filtered.C2), 'left_anti').show()
Py4JJavaError: An error occurred while calling o699.join.
: org.apache.spark.sql.AnalysisException: cannot resolve 'in_df.C1
' given input columns: [C2, C1, C2, C3];;
'Join LeftAnti, ('in_df.C1 = 'filtered.C2)
:- Project [C2#891L]
: +- Filter isnotnull(C2#891L)
: +- LogicalRDD [C1#890L, C2#891L, C3#892]
+- LogicalRDD [C1#900L, C2#901L, C3#902]
所以基本上,当加入 2 个数据帧时,您实际使用的条件是 filtered.C1 == filtered.C2
:
filtered = in_df.filter(in_df.C2.isNotNull())
filtered.join(in_df,(filtered.C1 == filtered.C2), 'left_anti').show()
+---+---+---+
| C1| C2| C3|
+---+---+---+
| 2| 1| B|
| 4| 11| D|
+---+---+---+
您可能已经更改了数据框的名称,但仍然可以通过调用 in_df.Ci
引用其中的列。为确保您引用的是正确的数据框,您可以使用别名:
import pyspark.sql.functions as psf
filtered.alias("filtered").join(in_df.alias("in_df"),(psf.col("in_df.C1") == psf.col("filtered.C2")), 'left_anti').show()
+---+---+---+
| C1| C2| C3|
+---+---+---+
| 4| 11| D|
+---+---+---+
处理列名歧义的最佳方法是从一开始就避免它们(重命名列或为数据框使用别名)。
在数据框中,我试图识别那些在 C2 列中具有值但在任何其他行的 C1 列中不存在的值的行。我尝试了以下代码:
in_df = sqlContext.createDataFrame([[1,None,'A'],[2,1,'B'],[3,None,'C'],[4,11,'D']],['C1','C2','C3'])
in_df.show()
+---+----+---+
| C1| C2| C3|
+---+----+---+
| 1|null| A|
| 2| 1| B|
| 3|null| C|
| 4| 11| D|
+---+----+---+
filtered = in_df.filter(in_df.C2.isNotNull())
filtered.show()
+---+---+---+
| C1| C2| C3|
+---+---+---+
| 2| 1| B|
| 4| 11| D|
+---+---+---+
现在应用 left_anti 连接预计 return 只有第 4 行,但是我也得到第 2 行:
filtered.join(in_df,(in_df.C1 == filtered.C2), 'left_anti').show()
+---+---+---+
| C1| C2| C3|
+---+---+---+
| 2| 1| B|
| 4| 11| D|
+---+---+---+
如果我 'materialize' 过滤后的 DF 结果如预期:
filtered = filtered.toDF(*filtered.columns)
filtered.join(in_df,(in_df.C1 == filtered.C2), 'left_anti').show()
+---+---+---+
| C1| C2| C3|
+---+---+---+
| 4| 11| D|
+---+---+---+
为什么需要这个 .toDF?
in_df.C1
实际上是指 filtered
列,如以下代码所示:
in_df = sqlContext.createDataFrame([[1,None,'A'],[2,1,'B'],[3,None,'C'],[4,11,'D']],['C1','C2','C3'])
filtered = in_df.filter(in_df.C2.isNotNull()).select("C2")
filtered.join(in_df,(in_df.C1 == filtered.C2), 'left_anti').show()
Py4JJavaError: An error occurred while calling o699.join. : org.apache.spark.sql.AnalysisException: cannot resolve '
in_df.C1
' given input columns: [C2, C1, C2, C3];; 'Join LeftAnti, ('in_df.C1 = 'filtered.C2) :- Project [C2#891L] : +- Filter isnotnull(C2#891L) : +- LogicalRDD [C1#890L, C2#891L, C3#892] +- LogicalRDD [C1#900L, C2#901L, C3#902]
所以基本上,当加入 2 个数据帧时,您实际使用的条件是 filtered.C1 == filtered.C2
:
filtered = in_df.filter(in_df.C2.isNotNull())
filtered.join(in_df,(filtered.C1 == filtered.C2), 'left_anti').show()
+---+---+---+
| C1| C2| C3|
+---+---+---+
| 2| 1| B|
| 4| 11| D|
+---+---+---+
您可能已经更改了数据框的名称,但仍然可以通过调用 in_df.Ci
引用其中的列。为确保您引用的是正确的数据框,您可以使用别名:
import pyspark.sql.functions as psf
filtered.alias("filtered").join(in_df.alias("in_df"),(psf.col("in_df.C1") == psf.col("filtered.C2")), 'left_anti').show()
+---+---+---+
| C1| C2| C3|
+---+---+---+
| 4| 11| D|
+---+---+---+
处理列名歧义的最佳方法是从一开始就避免它们(重命名列或为数据框使用别名)。