PySpark:数据并不总是符合模式 - 改变数据的逻辑
PySpark: data doesn't always conform to schema - logic to alter data
我是 PySpark 的新手,正在编写脚本,读取 .csv
个文件。
我已经在下面明确定义了架构,并且脚本在大多数情况下都能完美运行。
问题是,有时,一个值进入不符合模式的文件 - 例如'-' 可能出现在整数字段中,因此,我们会收到类型错误 - 当脚本中达到 df1.show()
时抛出错误。
我正在想办法有效地说 - 如果该值与定义的数据类型不匹配,则替换为 ''
有人知道这是否可行吗?任何建议都会很棒!
from pyspark.sql import SparkSession
import pyspark.sql.functions as sqlfunc
from pyspark.sql.types import *
import argparse, sys
from pyspark.sql import *
from pyspark.sql.functions import *
from datetime import datetime
#create a context that supports hive
def create_session(appname):
spark_session = SparkSession\
.builder\
.appName(appname)\
.master('yarn')\
.config("hive.metastore.uris", "thrift://serverip:9083")\
.enableHiveSupport()\
.getOrCreate()
return spark_session
### START MAIN ###
if __name__ == '__main__':
spark_session = create_session('testing_files')
dt_now = datetime.now()
today_unixtime = long(dt_now.strftime('%s'))
today_date = datetime.fromtimestamp(today_unixtime).strftime('%Y%m%d')
twoday_unixtime = long(dt_now.strftime('%s')) - 24*60*60*2
twoday = datetime.fromtimestamp(twoday_unixtime).strftime('%Y%m%d')
hourago = long(dt_now.strftime('%s')) - 60*60*4
hrdate = datetime.fromtimestamp(hourago).strftime('%H')
schema = [\
StructField('field1', StringType(), True),\
StructField('field2',StringType(), True), \
StructField('field3',IntegerType(), True) \
]
final_structure = StructType(schema)
df1 = spark_session.read\
.option("header","false")\
.option("delimiter", "\t")\
.csv('hdfs://hdfspath/dt=%s/*/*/*' %today_date, final_structure)
usercatschema = [\
StructField('field1', StringType(), True),\
StructField('field2',StringType(), True), \
StructField('field3',StringType(), True) \
]
usercat_structure = StructType(usercatschema)
df2 = spark_session.read\
.option("header","false")\
.option("delimiter", "\t")\
.csv('hdfs://hdfspath/v0/dt=%s/*' %twoday, usercat_structure)
df1.show()
df2.show()
df1.createOrReplaceTempView("dpi")
df2.createOrReplaceTempView("usercat")
finaldf = spark_session.sql('''
SQL QUERY
''')
finaldf.coalesce(10).write.format("com.databricks.spark.csv").option("header", "true").option('sep', '\t').mode('append').save('hdfs://hdfs path')
读取为String类型,然后转为int。
df.withColumn("field3",df.field3.cast("int"))
我是 PySpark 的新手,正在编写脚本,读取 .csv
个文件。
我已经在下面明确定义了架构,并且脚本在大多数情况下都能完美运行。
问题是,有时,一个值进入不符合模式的文件 - 例如'-' 可能出现在整数字段中,因此,我们会收到类型错误 - 当脚本中达到 df1.show()
时抛出错误。
我正在想办法有效地说 - 如果该值与定义的数据类型不匹配,则替换为 ''
有人知道这是否可行吗?任何建议都会很棒!
from pyspark.sql import SparkSession
import pyspark.sql.functions as sqlfunc
from pyspark.sql.types import *
import argparse, sys
from pyspark.sql import *
from pyspark.sql.functions import *
from datetime import datetime
#create a context that supports hive
def create_session(appname):
spark_session = SparkSession\
.builder\
.appName(appname)\
.master('yarn')\
.config("hive.metastore.uris", "thrift://serverip:9083")\
.enableHiveSupport()\
.getOrCreate()
return spark_session
### START MAIN ###
if __name__ == '__main__':
spark_session = create_session('testing_files')
dt_now = datetime.now()
today_unixtime = long(dt_now.strftime('%s'))
today_date = datetime.fromtimestamp(today_unixtime).strftime('%Y%m%d')
twoday_unixtime = long(dt_now.strftime('%s')) - 24*60*60*2
twoday = datetime.fromtimestamp(twoday_unixtime).strftime('%Y%m%d')
hourago = long(dt_now.strftime('%s')) - 60*60*4
hrdate = datetime.fromtimestamp(hourago).strftime('%H')
schema = [\
StructField('field1', StringType(), True),\
StructField('field2',StringType(), True), \
StructField('field3',IntegerType(), True) \
]
final_structure = StructType(schema)
df1 = spark_session.read\
.option("header","false")\
.option("delimiter", "\t")\
.csv('hdfs://hdfspath/dt=%s/*/*/*' %today_date, final_structure)
usercatschema = [\
StructField('field1', StringType(), True),\
StructField('field2',StringType(), True), \
StructField('field3',StringType(), True) \
]
usercat_structure = StructType(usercatschema)
df2 = spark_session.read\
.option("header","false")\
.option("delimiter", "\t")\
.csv('hdfs://hdfspath/v0/dt=%s/*' %twoday, usercat_structure)
df1.show()
df2.show()
df1.createOrReplaceTempView("dpi")
df2.createOrReplaceTempView("usercat")
finaldf = spark_session.sql('''
SQL QUERY
''')
finaldf.coalesce(10).write.format("com.databricks.spark.csv").option("header", "true").option('sep', '\t').mode('append').save('hdfs://hdfs path')
读取为String类型,然后转为int。
df.withColumn("field3",df.field3.cast("int"))