Pyspark- case 语句中的子查询
Pyspark- Subquery in a case statement
我正在尝试 运行 Pyspark 中 case 语句中的子查询,它抛出异常。如果一个 table 中的 id 存在于另一个 table.
中,我正在尝试创建一个新标志
谁能告诉我这在 pyspark 中是否可行?
temp_df=spark.sql("select *, case when key in (select distinct key from Ids) then 1 else 0 end as flag from main_table")
这里是错误:
AnalysisException: 'Predicate sub-queries can only be used in a Filter
This 似乎是有关子查询的最新详细文档 - 它与 Spark 2.0 相关,但从那以后我还没有看到这方面的重大更新。
该参考文献中的链接笔记本清楚地表明,实际上谓词子查询目前仅在 WHERE 子句中受支持。
即这会起作用(但当然不会产生预期的结果):
spark.sql("select * from main_table where id in (select distinct id from ids_table)")
您可以通过使用左 JOIN 获得相同的结果 - 这就是 IN 子查询通常被翻译成的结果(有关更多详细信息,请参阅上述链接的笔记本)。
例如:
# set up some data
l1 = [('Alice', 1), ('Bob', 2), ('Eve', 3)]
df1 = sql_sc.createDataFrame(l1, ['name', 'id'])
l2 = [(1,), (2,)]
df2 = sql_sc.createDataFrame(l2, ['id'])
df1.createOrReplaceTempView("main_table")
df2.createOrReplaceTempView("ids_table")
# use a left join
spark.sql("select * from main_table m left join ids_table d on (m.id=d.id)") \
.withColumn('flag', func.when(func.col('d.id').isNull(), 0).otherwise(1)) \
.drop('id').collect()
# result:
[Row(name='Bob', flag=1), Row(name='Eve', flag=0), Row(name='Alice', flag=1)]
或者,使用 pyspark sql 函数而不是 sql 语法:
df2 = df2.withColumnRenamed('id', 'id_faux')
df1.join(df2, df1.id == df2.id_faux, how='left') \
.withColumn('flag', func.when(func.col('id_faux').isNull(), 0).otherwise(1)).drop('id_faux').collect()
我正在尝试 运行 Pyspark 中 case 语句中的子查询,它抛出异常。如果一个 table 中的 id 存在于另一个 table.
中,我正在尝试创建一个新标志谁能告诉我这在 pyspark 中是否可行?
temp_df=spark.sql("select *, case when key in (select distinct key from Ids) then 1 else 0 end as flag from main_table")
这里是错误:
AnalysisException: 'Predicate sub-queries can only be used in a Filter
This 似乎是有关子查询的最新详细文档 - 它与 Spark 2.0 相关,但从那以后我还没有看到这方面的重大更新。
该参考文献中的链接笔记本清楚地表明,实际上谓词子查询目前仅在 WHERE 子句中受支持。 即这会起作用(但当然不会产生预期的结果):
spark.sql("select * from main_table where id in (select distinct id from ids_table)")
您可以通过使用左 JOIN 获得相同的结果 - 这就是 IN 子查询通常被翻译成的结果(有关更多详细信息,请参阅上述链接的笔记本)。
例如:
# set up some data
l1 = [('Alice', 1), ('Bob', 2), ('Eve', 3)]
df1 = sql_sc.createDataFrame(l1, ['name', 'id'])
l2 = [(1,), (2,)]
df2 = sql_sc.createDataFrame(l2, ['id'])
df1.createOrReplaceTempView("main_table")
df2.createOrReplaceTempView("ids_table")
# use a left join
spark.sql("select * from main_table m left join ids_table d on (m.id=d.id)") \
.withColumn('flag', func.when(func.col('d.id').isNull(), 0).otherwise(1)) \
.drop('id').collect()
# result:
[Row(name='Bob', flag=1), Row(name='Eve', flag=0), Row(name='Alice', flag=1)]
或者,使用 pyspark sql 函数而不是 sql 语法:
df2 = df2.withColumnRenamed('id', 'id_faux')
df1.join(df2, df1.id == df2.id_faux, how='left') \
.withColumn('flag', func.when(func.col('id_faux').isNull(), 0).otherwise(1)).drop('id_faux').collect()