IF 语句 Pyspark
IF Statement Pyspark
我的数据如下所示:
+----------+-------------+-------+--------------------+--------------+---+
|purch_date| purch_class|tot_amt| serv-provider|purch_location| id|
+----------+-------------+-------+--------------------+--------------+---+
|03/11/2017|Uncategorized| -17.53| HOVER | | 0|
|02/11/2017| Groceries| -70.05|1774 MAC'S CONVEN...| BRAMPTON | 1|
|31/10/2017|Gasoline/Fuel| -20| ESSO | | 2|
|31/10/2017| Travel| -9|TORONTO PARKING A...| TORONTO | 3|
|30/10/2017| Groceries| -1.84| LONGO'S # 2| | 4|
我正在尝试创建一个二进制列,它将由 tot_amt 列的值定义。我想将此列添加到上述数据中。
如果 tot_amt <(-50) 我希望它成为 return 0,如果 tot_amt > (-50) 我希望它成为新列中的 return 1。
我目前的尝试:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
def y(row):
if row['tot_amt'] < (-50):
val = 1
else:
val = 0
return val
y_udf = udf(y, IntegerType())
df_7 = df_4.withColumn('Y',y_udf(df_4['tot_amt'], (df_4['purch_class'],
(df_4['purch_date'], (df_4['serv-provider'], (df_4['purch_location'])))
display(df_7)
我收到的错误消息:
SparkException: Job aborted due to stage failure: Task 0 in stage 67.0 failed
1 times, most recent failure: Lost task 0.0 in stage 67.0 (TID 85, localhost,
executor driver): org.apache.spark.api.python.PythonException: Traceback (most
recent call last):
File "/databricks/spark/python/pyspark/worker.py", line 177, in main
process()
File "/databricks/spark/python/pyspark/worker.py", line 172, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/databricks/spark/python/pyspark/worker.py", line 104, in <lambda>
func = lambda _, it: map(mapper, it)
File "<string>", line 1, in <lambda>
File "/databricks/spark/python/pyspark/worker.py", line 71, in <lambda>
return lambda *a: f(*a)
TypeError: y() takes exactly 1 argument (2 given)
如何让它工作(通过 struct
)
from pyspark.sql.functions import struct
df_4.withColumn("y", y_udf(
# Include columns you want
struct(df_4['tot_amt'], df_4['purch_class'])
))
什么更有意义
y_udf = udf(lambda y: 1 if y < -50 else 0, IntegerType())
df_4.withColumn("y", y_udf('tot_amt'))
应该如何完成:
from pyspark.sql.functions import when
df_4.withColumn("y", when(df_4['tot_amt'] < -50, 1).otherwise(0))
您不需要为此使用 UDF - 请改用内置函数 when
。这是一个玩具数据类似于您的 tot_amt
列的示例:
spark.version
# u'2.2.0'
from pyspark.sql import Row
from pyspark.sql.functions import col, when
df = spark.createDataFrame([Row(-17.53),
Row(-70.05),
Row(-20.),
Row(-9.),
Row(-1.84)
],
["tot_amt"])
df.show()
# +-------+
# |tot_amt|
# +-------+
# | -17.53|
# | -70.05|
# | -20.0|
# | -9.0|
# | -1.84|
# +-------+
df.withColumn('Y', when(col('tot_amt') < -50., 1).otherwise(0)).show()
# +-------+---+
# |tot_amt| Y|
# +-------+---+
# | -17.53| 0|
# | -70.05| 1|
# | -20.0| 0|
# | -9.0| 0|
# | -1.84| 0|
# +-------+---+
我的数据如下所示:
+----------+-------------+-------+--------------------+--------------+---+
|purch_date| purch_class|tot_amt| serv-provider|purch_location| id|
+----------+-------------+-------+--------------------+--------------+---+
|03/11/2017|Uncategorized| -17.53| HOVER | | 0|
|02/11/2017| Groceries| -70.05|1774 MAC'S CONVEN...| BRAMPTON | 1|
|31/10/2017|Gasoline/Fuel| -20| ESSO | | 2|
|31/10/2017| Travel| -9|TORONTO PARKING A...| TORONTO | 3|
|30/10/2017| Groceries| -1.84| LONGO'S # 2| | 4|
我正在尝试创建一个二进制列,它将由 tot_amt 列的值定义。我想将此列添加到上述数据中。 如果 tot_amt <(-50) 我希望它成为 return 0,如果 tot_amt > (-50) 我希望它成为新列中的 return 1。
我目前的尝试:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
def y(row):
if row['tot_amt'] < (-50):
val = 1
else:
val = 0
return val
y_udf = udf(y, IntegerType())
df_7 = df_4.withColumn('Y',y_udf(df_4['tot_amt'], (df_4['purch_class'],
(df_4['purch_date'], (df_4['serv-provider'], (df_4['purch_location'])))
display(df_7)
我收到的错误消息:
SparkException: Job aborted due to stage failure: Task 0 in stage 67.0 failed
1 times, most recent failure: Lost task 0.0 in stage 67.0 (TID 85, localhost,
executor driver): org.apache.spark.api.python.PythonException: Traceback (most
recent call last):
File "/databricks/spark/python/pyspark/worker.py", line 177, in main
process()
File "/databricks/spark/python/pyspark/worker.py", line 172, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/databricks/spark/python/pyspark/worker.py", line 104, in <lambda>
func = lambda _, it: map(mapper, it)
File "<string>", line 1, in <lambda>
File "/databricks/spark/python/pyspark/worker.py", line 71, in <lambda>
return lambda *a: f(*a)
TypeError: y() takes exactly 1 argument (2 given)
如何让它工作(通过 struct
)
from pyspark.sql.functions import struct
df_4.withColumn("y", y_udf(
# Include columns you want
struct(df_4['tot_amt'], df_4['purch_class'])
))
什么更有意义
y_udf = udf(lambda y: 1 if y < -50 else 0, IntegerType())
df_4.withColumn("y", y_udf('tot_amt'))
应该如何完成:
from pyspark.sql.functions import when
df_4.withColumn("y", when(df_4['tot_amt'] < -50, 1).otherwise(0))
您不需要为此使用 UDF - 请改用内置函数 when
。这是一个玩具数据类似于您的 tot_amt
列的示例:
spark.version
# u'2.2.0'
from pyspark.sql import Row
from pyspark.sql.functions import col, when
df = spark.createDataFrame([Row(-17.53),
Row(-70.05),
Row(-20.),
Row(-9.),
Row(-1.84)
],
["tot_amt"])
df.show()
# +-------+
# |tot_amt|
# +-------+
# | -17.53|
# | -70.05|
# | -20.0|
# | -9.0|
# | -1.84|
# +-------+
df.withColumn('Y', when(col('tot_amt') < -50., 1).otherwise(0)).show()
# +-------+---+
# |tot_amt| Y|
# +-------+---+
# | -17.53| 0|
# | -70.05| 1|
# | -20.0| 0|
# | -9.0| 0|
# | -1.84| 0|
# +-------+---+