在 pyspark 中使用 when 语句 - 当我添加到脚本的各个部分时不起作用
Using when statement in pyspark - not working when I add to various parts of the script
我是 PySpark 的新手,但已设法使以下工作正常进行。
我还有 2 个要求,我将通过 SQL.
中的 case 语句来实现这两个要求
我试过以下方法:
sqlfunc\
.when((df5.time_minute > 0) &(df5.time_minute < 16) , “Q1” )\
.when((df5.time_minute > 15) &(df5.time_minute < 31) , “Q2” )\
.when((df5.time_minute > 30) &(df5.time_minute < 46) , “Q3” )\
.when((df5.time_minute > 45) &(df5.time_minute < 61) , “Q4” )\
.otherwise("Unknown")\
.alias("Quarter"))
我试过将其添加为 withColumn() 条件以及 select。但无论哪种方式,它都不会创建一个包含结果的新列。
谁能告诉我如何将 case 语句添加到脚本中,以便输出提供一个新列。正如我所说,我已经尝试过
- withColumn('ColumnName', when 语句....)
- 在 select('field1', 'field2', when 语句)
任何帮助都会很棒
from pyspark.sql import SparkSession
import pyspark.sql.functions as sqlfunc
from pyspark.sql.types import *
import argparse, sys
from pyspark.sql import *
import pyspark.sql.functions as sqlfunc
from datetime import datetime
#create a context that supports hive
def create_session(appname):
spark_session = SparkSession\
.builder\
.appName(appname)\
.master('yarn')\
.config("hive.metastore.uris", "thrift:IP:9083")\
.enableHiveSupport()\
.getOrCreate()
return spark_session
### START MAIN ###
if __name__ == '__main__':
spark_session = create_session('testing_files')
dt_now = datetime.now()
today_unixtime = long(dt_now.strftime('%s'))
today_date = datetime.fromtimestamp(today_unixtime).strftime('%Y%m%d')
twoday_unixtime = long(dt_now.strftime('%s')) - 24*60*60*2
twoday = datetime.fromtimestamp(twoday_unixtime).strftime('%Y%m%d')
hourago = long(dt_now.strftime('%s')) - 60*60*4
hrdate = datetime.fromtimestamp(hourago).strftime('%H')
schema = [\
StructField('field1', StringType(), True),\
StructField('field2',StringType(), True), \
StructField('field3', StringType(), True), \
StructField('field4',LongType(), True) \
]
final_structure = StructType(schema)
df1 = spark_session.read\
.option("header","false")\
.option("delimiter", "\t")\
.csv('hdfs://directory/dt=%s/*/*/*' %today_date, final_structure)
usercatschema = [\
StructField('field1', StringType(), True),\
StructField('field2',StringType(), True), \
StructField('field3',StringType(), True) \
]
usercat_structure = StructType(usercatschema)
df2 = spark_session.read\
.option("header","false")\
.option("delimiter", "\t")\
.csv('hdfs://directory/dt=%s/*' %twoday, usercat_structure)
df3 = df2.select('field1','field2', 'field3')
df4= df1.join(df3,(df1.field1==df3.field1)&(sqlfunc.substring(df1.field2, 0, 14)==df3.field2),"left")
df5 = df4\
.coalesce(1000)\
.select('df1.field1','df2.field1', ......)\
.groupBy('field1','field2'....)\
.agg(
sqlfunc.sum(df4.field1).alias('upload'),\
sqlfunc.sum(df4.field2).alias('download'),\
sqlfunc.countDistinct(df4.field3).alias('distinct_field3'),\
sqlfunc.count(df4.field4).alias('field4')\
)\
.select('field1......)
df5.show()
这是工作脚本:
df5 = df4\
.coalesce(1000)\
.withColumn('quarter',\
sqlfunc.when((df4.time_minute >-1 ) & (df4.time_minute < 16), 1)\
.when((df4.time_minute >15 ) & (df4.time_minute < 31), 2)\
.when((df4.time_minute >30 ) & (df4.time_minute < 46), 3)\
.when((df4.time_minute >45 ) & (df4.time_minute < 61), 4)\
.otherwise(5))\
.select('field1','field2', 'date', 'time_hour', 'time_minute')\
.groupBy('date', 'time_hour', 'quarter')\
.agg(
sqlfunc.sum(df4.field1).alias('sumfield1'),\
sqlfunc.sum(df4.field2).alias('sumfield2'),\
)\
.select('date', 'time_hour', 'quarter', 'sumfield1', 'sumfield2')
df5.show()
我是 PySpark 的新手,但已设法使以下工作正常进行。
我还有 2 个要求,我将通过 SQL.
中的 case 语句来实现这两个要求我试过以下方法:
sqlfunc\
.when((df5.time_minute > 0) &(df5.time_minute < 16) , “Q1” )\
.when((df5.time_minute > 15) &(df5.time_minute < 31) , “Q2” )\
.when((df5.time_minute > 30) &(df5.time_minute < 46) , “Q3” )\
.when((df5.time_minute > 45) &(df5.time_minute < 61) , “Q4” )\
.otherwise("Unknown")\
.alias("Quarter"))
我试过将其添加为 withColumn() 条件以及 select。但无论哪种方式,它都不会创建一个包含结果的新列。
谁能告诉我如何将 case 语句添加到脚本中,以便输出提供一个新列。正如我所说,我已经尝试过 - withColumn('ColumnName', when 语句....) - 在 select('field1', 'field2', when 语句)
任何帮助都会很棒
from pyspark.sql import SparkSession
import pyspark.sql.functions as sqlfunc
from pyspark.sql.types import *
import argparse, sys
from pyspark.sql import *
import pyspark.sql.functions as sqlfunc
from datetime import datetime
#create a context that supports hive
def create_session(appname):
spark_session = SparkSession\
.builder\
.appName(appname)\
.master('yarn')\
.config("hive.metastore.uris", "thrift:IP:9083")\
.enableHiveSupport()\
.getOrCreate()
return spark_session
### START MAIN ###
if __name__ == '__main__':
spark_session = create_session('testing_files')
dt_now = datetime.now()
today_unixtime = long(dt_now.strftime('%s'))
today_date = datetime.fromtimestamp(today_unixtime).strftime('%Y%m%d')
twoday_unixtime = long(dt_now.strftime('%s')) - 24*60*60*2
twoday = datetime.fromtimestamp(twoday_unixtime).strftime('%Y%m%d')
hourago = long(dt_now.strftime('%s')) - 60*60*4
hrdate = datetime.fromtimestamp(hourago).strftime('%H')
schema = [\
StructField('field1', StringType(), True),\
StructField('field2',StringType(), True), \
StructField('field3', StringType(), True), \
StructField('field4',LongType(), True) \
]
final_structure = StructType(schema)
df1 = spark_session.read\
.option("header","false")\
.option("delimiter", "\t")\
.csv('hdfs://directory/dt=%s/*/*/*' %today_date, final_structure)
usercatschema = [\
StructField('field1', StringType(), True),\
StructField('field2',StringType(), True), \
StructField('field3',StringType(), True) \
]
usercat_structure = StructType(usercatschema)
df2 = spark_session.read\
.option("header","false")\
.option("delimiter", "\t")\
.csv('hdfs://directory/dt=%s/*' %twoday, usercat_structure)
df3 = df2.select('field1','field2', 'field3')
df4= df1.join(df3,(df1.field1==df3.field1)&(sqlfunc.substring(df1.field2, 0, 14)==df3.field2),"left")
df5 = df4\
.coalesce(1000)\
.select('df1.field1','df2.field1', ......)\
.groupBy('field1','field2'....)\
.agg(
sqlfunc.sum(df4.field1).alias('upload'),\
sqlfunc.sum(df4.field2).alias('download'),\
sqlfunc.countDistinct(df4.field3).alias('distinct_field3'),\
sqlfunc.count(df4.field4).alias('field4')\
)\
.select('field1......)
df5.show()
这是工作脚本:
df5 = df4\
.coalesce(1000)\
.withColumn('quarter',\
sqlfunc.when((df4.time_minute >-1 ) & (df4.time_minute < 16), 1)\
.when((df4.time_minute >15 ) & (df4.time_minute < 31), 2)\
.when((df4.time_minute >30 ) & (df4.time_minute < 46), 3)\
.when((df4.time_minute >45 ) & (df4.time_minute < 61), 4)\
.otherwise(5))\
.select('field1','field2', 'date', 'time_hour', 'time_minute')\
.groupBy('date', 'time_hour', 'quarter')\
.agg(
sqlfunc.sum(df4.field1).alias('sumfield1'),\
sqlfunc.sum(df4.field2).alias('sumfield2'),\
)\
.select('date', 'time_hour', 'quarter', 'sumfield1', 'sumfield2')
df5.show()