Apache spark 处理案例陈述
Apache spark dealing with case statements
我正在处理将 SQL 代码转换为 PySpark 代码并遇到一些 SQL 语句。我不知道如何处理 pyspark 中的案例陈述?我计划创建一个 RDD,然后使用 rdd.map 然后进行一些逻辑检查。这是正确的做法吗?请帮忙!
基本上我需要遍历 RDD 或 DF 中的每一行,并根据一些逻辑我需要编辑其中一个列值。
case
when (e."a" Like 'a%' Or e."b" Like 'b%')
And e."aa"='BW' And cast(e."abc" as decimal(10,4))=75.0 Then 'callitA'
when (e."a" Like 'b%' Or e."b" Like 'a%')
And e."aa"='AW' And cast(e."abc" as decimal(10,4))=75.0 Then 'callitB'
else
'CallitC'
我不擅长python。但会尝试给出一些我在 scala 中所做的事情的指示。
Question : rdd.map
and then do some logic checks. Is that the right approach?
这是一种方法。
withColumn
is another approach
在这种情况下你必须处理 Column
通过 - spark udf 或其他语法
from pyspark.sql import functions as F
df.select(df.name, F.when(df.age > 4, 1).when(df.age < 3, -1).otherwise(0)).show()
+-----+--------------------------------------------------------+
| name|CASE WHEN (age > 4) THEN 1 WHEN (age < 3) THEN -1 ELSE 0|
+-----+--------------------------------------------------------+
|Alice| -1|
| Bob| 1|
+-----+--------------------------------------------------------+
from pyspark.sql import functions as F
df.select(df.name, F.when(df.age > 3, 1).otherwise(0)).show()
+-----+---------------------------------+
| name|CASE WHEN (age > 3) THEN 1 ELSE 0|
+-----+---------------------------------+
|Alice| 0|
| Bob| 1|
+-----+---------------------------------+
您也可以使用 udf 代替 when
otherwise
。
这些是在pyspark
.
中写If-Else
/ When-Then-Else
/ When-Otherwise
表达式的几种方法
示例数据帧
df = spark.createDataFrame([(1,1),(2,2),(3,3)],['id','value'])
df.show()
#+---+-----+
#| id|value|
#+---+-----+
#| 1| 1|
#| 2| 2|
#| 3| 3|
#+---+-----+
#Desired Output:
#+---+-----+----------+
#| id|value|value_desc|
#+---+-----+----------+
#| 1| 1| one|
#| 2| 2| two|
#| 3| 3| other|
#+---+-----+----------+
选项#1:withColumn()
使用 when-otherwise
from pyspark.sql.functions import when
df.withColumn("value_desc",when(df.value == 1, 'one').when(df.value == 2, 'two').otherwise('other')).show()
选项#2:select()
使用 when-otherwise
from pyspark.sql.functions import when
df.select("*",when(df.value == 1, 'one').when(df.value == 2, 'two').otherwise('other').alias('value_desc')).show()
Option3: selectExpr()
使用 SQL 等效 CASE 表达式
df.selectExpr("*","CASE WHEN value == 1 THEN 'one' WHEN value == 2 THEN 'two' ELSE 'other' END AS value_desc").show()
SQL等表达式也可以用pyspark.sql.functions.expr函数写成withColumn()
和select()
。以下是示例。
Option4: select()
使用 expr 函数
from pyspark.sql.functions import expr
df.select("*",expr("CASE WHEN value == 1 THEN 'one' WHEN value == 2 THEN 'two' ELSE 'other' END AS value_desc")).show()
Option5: withColumn()
使用 expr 函数
from pyspark.sql.functions import expr
df.withColumn("value_desc",expr("CASE WHEN value == 1 THEN 'one' WHEN value == 2 THEN 'two' ELSE 'other' END AS value_desc")).show()
输出:
#+---+-----+----------+
#| id|value|value_desc|
#+---+-----+----------+
#| 1| 1| one|
#| 2| 2| two|
#| 3| 3| other|
#+---+-----+----------+
我正在处理将 SQL 代码转换为 PySpark 代码并遇到一些 SQL 语句。我不知道如何处理 pyspark 中的案例陈述?我计划创建一个 RDD,然后使用 rdd.map 然后进行一些逻辑检查。这是正确的做法吗?请帮忙!
基本上我需要遍历 RDD 或 DF 中的每一行,并根据一些逻辑我需要编辑其中一个列值。
case
when (e."a" Like 'a%' Or e."b" Like 'b%')
And e."aa"='BW' And cast(e."abc" as decimal(10,4))=75.0 Then 'callitA'
when (e."a" Like 'b%' Or e."b" Like 'a%')
And e."aa"='AW' And cast(e."abc" as decimal(10,4))=75.0 Then 'callitB'
else
'CallitC'
我不擅长python。但会尝试给出一些我在 scala 中所做的事情的指示。
Question :
rdd.map
and then do some logic checks. Is that the right approach?
这是一种方法。
withColumn
is another approach
在这种情况下你必须处理 Column
通过 - spark udf 或其他语法
from pyspark.sql import functions as F
df.select(df.name, F.when(df.age > 4, 1).when(df.age < 3, -1).otherwise(0)).show()
+-----+--------------------------------------------------------+
| name|CASE WHEN (age > 4) THEN 1 WHEN (age < 3) THEN -1 ELSE 0|
+-----+--------------------------------------------------------+
|Alice| -1|
| Bob| 1|
+-----+--------------------------------------------------------+
from pyspark.sql import functions as F
df.select(df.name, F.when(df.age > 3, 1).otherwise(0)).show()
+-----+---------------------------------+
| name|CASE WHEN (age > 3) THEN 1 ELSE 0|
+-----+---------------------------------+
|Alice| 0|
| Bob| 1|
+-----+---------------------------------+
您也可以使用 udf 代替 when
otherwise
。
这些是在pyspark
.
If-Else
/ When-Then-Else
/ When-Otherwise
表达式的几种方法
示例数据帧
df = spark.createDataFrame([(1,1),(2,2),(3,3)],['id','value'])
df.show()
#+---+-----+
#| id|value|
#+---+-----+
#| 1| 1|
#| 2| 2|
#| 3| 3|
#+---+-----+
#Desired Output:
#+---+-----+----------+
#| id|value|value_desc|
#+---+-----+----------+
#| 1| 1| one|
#| 2| 2| two|
#| 3| 3| other|
#+---+-----+----------+
选项#1:withColumn()
使用 when-otherwise
from pyspark.sql.functions import when
df.withColumn("value_desc",when(df.value == 1, 'one').when(df.value == 2, 'two').otherwise('other')).show()
选项#2:select()
使用 when-otherwise
from pyspark.sql.functions import when
df.select("*",when(df.value == 1, 'one').when(df.value == 2, 'two').otherwise('other').alias('value_desc')).show()
Option3: selectExpr()
使用 SQL 等效 CASE 表达式
df.selectExpr("*","CASE WHEN value == 1 THEN 'one' WHEN value == 2 THEN 'two' ELSE 'other' END AS value_desc").show()
SQL等表达式也可以用pyspark.sql.functions.expr函数写成withColumn()
和select()
。以下是示例。
Option4: select()
使用 expr 函数
from pyspark.sql.functions import expr
df.select("*",expr("CASE WHEN value == 1 THEN 'one' WHEN value == 2 THEN 'two' ELSE 'other' END AS value_desc")).show()
Option5: withColumn()
使用 expr 函数
from pyspark.sql.functions import expr
df.withColumn("value_desc",expr("CASE WHEN value == 1 THEN 'one' WHEN value == 2 THEN 'two' ELSE 'other' END AS value_desc")).show()
输出:
#+---+-----+----------+
#| id|value|value_desc|
#+---+-----+----------+
#| 1| 1| one|
#| 2| 2| two|
#| 3| 3| other|
#+---+-----+----------+