使用 pyspark 如何从 csv 文件中拒绝错误(格式错误的)记录并将这些被拒绝的记录保存在新文件中
using pyspark how to reject bad (malformed) records from csv file and save these rejected records in a new file
我正在使用 pyspark 将数据从 csv 文件加载到数据帧中,我能够在删除格式错误的记录的同时加载数据,但是我如何从 csv 文件中拒绝这些错误(格式错误的)记录并保存这些在新文件中拒绝记录?
这是一个想法,虽然我对此不是很满意。如您所知,CSV 解析器有不同的模式来丢弃格式错误的数据。但是,如果未指定模式,则它 'fills the blanks' 具有默认 null
值。你可以利用它来发挥你的优势。
使用此数据,并假设列 article_id
在设计上不可为空:
1,abcd,correct record1,description1 haha
Bad record,Bad record description
3,hijk,another correct record,description2
Not_An_Integer,article,no integer type,description
代码如下:
#!/usr/bin/env python
# coding: utf-8
import pyspark
from pyspark.sql.types import *
from pyspark.sql import Row, functions as F
sc = pyspark.SparkContext.getOrCreate()
spark = pyspark.sql.SparkSession(sc)
# Load the data with your schema, drop the malformed information
schema = StructType([ StructField("article_id", IntegerType()),
StructField("title", StringType()),
StructField("short_desc", StringType()),
StructField("article_desc", StringType())])
valid_data = spark.read.format("csv").schema(schema).option("mode","DROPMALFORMED").load("./data.csv")
valid_data.show()
"""
+----------+-----+--------------------+-----------------+
|article_id|title| short_desc| article_desc|
+----------+-----+--------------------+-----------------+
| 1| abcd| correct record1|description1 haha|
| 3| hijk|another correct r...| description2|
+----------+-----+--------------------+-----------------+
"""
# Load the data and let spark infer everything
malformed_data = spark.read.format("csv").option("header", "false").load("./data.csv")
malformed_data.show()
"""
+--------------+--------------------+--------------------+-----------------+
| _c0| _c1| _c2| _c3|
+--------------+--------------------+--------------------+-----------------+
| 1| abcd| correct record1|description1 haha|
| Bad record|Bad record descri...| null| null|
| 3| hijk|another correct r...| description2|
|Not_An_Integer| article| no integer type| description|
+--------------+--------------------+--------------------+-----------------+
"""
# Join and keep all data from the 'malformed' DataFrame.
merged = valid_data.join(malformed_data, on=valid_data.article_id == malformed_data._c0, how="right")
# Filter those records for which a matching with the 'valid' data was not possible
malformed = merged.where(F.isnull(merged.article_id))
malformed.show()
"""
+----------+-----+----------+------------+--------------+--------------------+---------------+-----------+
|article_id|title|short_desc|article_desc| _c0| _c1| _c2| _c3|
+----------+-----+----------+------------+--------------+--------------------+---------------+-----------+
| null| null| null| null| Bad record|Bad record descri...| null| null|
| null| null| null| null|Not_An_Integer| article|no integer type|description|
+----------+-----+----------+------------+--------------+--------------------+---------------+-----------+
"""
我不太喜欢这个,因为它对 Spark 解析 CSV 的方式非常敏感,并且它可能不适用于所有文件,但您可能会发现它很有用。
我正在使用 pyspark 将数据从 csv 文件加载到数据帧中,我能够在删除格式错误的记录的同时加载数据,但是我如何从 csv 文件中拒绝这些错误(格式错误的)记录并保存这些在新文件中拒绝记录?
这是一个想法,虽然我对此不是很满意。如您所知,CSV 解析器有不同的模式来丢弃格式错误的数据。但是,如果未指定模式,则它 'fills the blanks' 具有默认 null
值。你可以利用它来发挥你的优势。
使用此数据,并假设列 article_id
在设计上不可为空:
1,abcd,correct record1,description1 haha
Bad record,Bad record description
3,hijk,another correct record,description2
Not_An_Integer,article,no integer type,description
代码如下:
#!/usr/bin/env python
# coding: utf-8
import pyspark
from pyspark.sql.types import *
from pyspark.sql import Row, functions as F
sc = pyspark.SparkContext.getOrCreate()
spark = pyspark.sql.SparkSession(sc)
# Load the data with your schema, drop the malformed information
schema = StructType([ StructField("article_id", IntegerType()),
StructField("title", StringType()),
StructField("short_desc", StringType()),
StructField("article_desc", StringType())])
valid_data = spark.read.format("csv").schema(schema).option("mode","DROPMALFORMED").load("./data.csv")
valid_data.show()
"""
+----------+-----+--------------------+-----------------+
|article_id|title| short_desc| article_desc|
+----------+-----+--------------------+-----------------+
| 1| abcd| correct record1|description1 haha|
| 3| hijk|another correct r...| description2|
+----------+-----+--------------------+-----------------+
"""
# Load the data and let spark infer everything
malformed_data = spark.read.format("csv").option("header", "false").load("./data.csv")
malformed_data.show()
"""
+--------------+--------------------+--------------------+-----------------+
| _c0| _c1| _c2| _c3|
+--------------+--------------------+--------------------+-----------------+
| 1| abcd| correct record1|description1 haha|
| Bad record|Bad record descri...| null| null|
| 3| hijk|another correct r...| description2|
|Not_An_Integer| article| no integer type| description|
+--------------+--------------------+--------------------+-----------------+
"""
# Join and keep all data from the 'malformed' DataFrame.
merged = valid_data.join(malformed_data, on=valid_data.article_id == malformed_data._c0, how="right")
# Filter those records for which a matching with the 'valid' data was not possible
malformed = merged.where(F.isnull(merged.article_id))
malformed.show()
"""
+----------+-----+----------+------------+--------------+--------------------+---------------+-----------+
|article_id|title|short_desc|article_desc| _c0| _c1| _c2| _c3|
+----------+-----+----------+------------+--------------+--------------------+---------------+-----------+
| null| null| null| null| Bad record|Bad record descri...| null| null|
| null| null| null| null|Not_An_Integer| article|no integer type|description|
+----------+-----+----------+------------+--------------+--------------------+---------------+-----------+
"""
我不太喜欢这个,因为它对 Spark 解析 CSV 的方式非常敏感,并且它可能不适用于所有文件,但您可能会发现它很有用。