PySpark:数据并不总是符合模式 - 改变数据的逻辑

PySpark: data doesn't always conform to schema - logic to alter data

我是 PySpark 的新手,正在编写脚本,读取 .csv 个文件。

我已经在下面明确定义了架构,并且脚本在大多数情况下都能完美运行。

问题是,有时,一个值进入不符合模式的文件 - 例如'-' 可能出现在整数字段中,因此,我们会收到类型错误 - 当脚本中达到 df1.show() 时抛出错误。

我正在想办法有效地说 - 如果该值与定义的数据类型不匹配,则替换为 ''

有人知道这是否可行吗?任何建议都会很棒!

from pyspark.sql import SparkSession
import pyspark.sql.functions as sqlfunc
from pyspark.sql.types import *
import argparse, sys
from pyspark.sql import *
from pyspark.sql.functions import *
from datetime import datetime
#create a context that supports hive
def create_session(appname):
    spark_session = SparkSession\
        .builder\
        .appName(appname)\
        .master('yarn')\
        .config("hive.metastore.uris", "thrift://serverip:9083")\
        .enableHiveSupport()\
        .getOrCreate()
    return spark_session

### START MAIN ###
if __name__ == '__main__':
    spark_session = create_session('testing_files')
    dt_now = datetime.now()

    today_unixtime = long(dt_now.strftime('%s'))
    today_date = datetime.fromtimestamp(today_unixtime).strftime('%Y%m%d')

    twoday_unixtime = long(dt_now.strftime('%s')) - 24*60*60*2
    twoday = datetime.fromtimestamp(twoday_unixtime).strftime('%Y%m%d')

    hourago = long(dt_now.strftime('%s')) - 60*60*4
    hrdate = datetime.fromtimestamp(hourago).strftime('%H')

    schema = [\
        StructField('field1', StringType(), True),\
        StructField('field2',StringType(), True), \
        StructField('field3',IntegerType(), True) \
        ]
    final_structure = StructType(schema)

    df1 = spark_session.read\
        .option("header","false")\
        .option("delimiter", "\t")\
        .csv('hdfs://hdfspath/dt=%s/*/*/*' %today_date, final_structure)

    usercatschema = [\
        StructField('field1', StringType(), True),\
        StructField('field2',StringType(), True), \
        StructField('field3',StringType(), True) \
        ]
    usercat_structure = StructType(usercatschema)

    df2 = spark_session.read\
        .option("header","false")\
        .option("delimiter", "\t")\
        .csv('hdfs://hdfspath/v0/dt=%s/*' %twoday, usercat_structure)

    df1.show()
    df2.show()
    df1.createOrReplaceTempView("dpi")
    df2.createOrReplaceTempView("usercat")

    finaldf = spark_session.sql('''
    SQL QUERY
''')
    finaldf.coalesce(10).write.format("com.databricks.spark.csv").option("header", "true").option('sep', '\t').mode('append').save('hdfs://hdfs path')

读取为String类型,然后转为int。

df.withColumn("field3",df.field3.cast("int"))