如何在使用 PySpark 将 CSV 文件作为数据帧读取时跳过行?
How to skip lines while reading a CSV file as a dataFrame using PySpark?
我有一个结构如下的 CSV 文件:
Header
Blank Row
"Col1","Col2"
"1,200","1,456"
"2,000","3,450"
我在阅读这个文件时遇到了两个问题。
- 我想忽略 Header 并忽略空白行
- 值中的逗号不是分隔符
这是我尝试过的:
df = sc.textFile("myFile.csv")\
.map(lambda line: line.split(","))\ #Split By comma
.filter(lambda line: len(line) == 2).collect() #This helped me ignore the first two rows
但是,这不起作用,因为值中的逗号被读取为分隔符并且 len(line)
返回 4 而不是 2。
我尝试了另一种方法:
data = sc.textFile("myFile.csv")
headers = data.take(2) #First two rows to be skipped
当时的想法是使用过滤器而不是阅读 headers。但是,当我尝试打印 headers 时,我得到了编码值。
[\x00A\x00Y\x00 \x00J\x00u\x00l\x00y\x00 \x002\x000\x001\x006\x00]
读取 CSV 文件并跳过前两行的正确方法是什么?
尝试使用 csv.reader 和 'quotechar' parameter.It 将正确拆分行。
之后您可以根据需要添加过滤器。
import csv
from pyspark.sql.types import StringType
df = sc.textFile("test2.csv")\
.mapPartitions(lambda line: csv.reader(line,delimiter=',', quotechar='"')).filter(lambda line: len(line)>=2 and line[0]!= 'Col1')\
.toDF(['Col1','Col2'])
对于您的第一个问题,只需使用 zipWithIndex
压缩 RDD 中的行并过滤您不需要的行。
对于第二个问题,您可以尝试从行中删除第一个和最后一个双引号字符,然后在 ","
.
上拆分行
rdd = sc.textFile("myfile.csv")
rdd.zipWithIndex().
filter(lambda x: x[1] > 2).
map(lambda x: x[0]).
map(lambda x: x.strip('"').split('","')).
toDF(["Col1", "Col2"])
尽管如此,如果您正在寻找在 Spark 中处理 CSV 文件的标准方法,最好使用来自 databricks 的 spark-csv
包。
您为什么不试试 pyspark.sql
中的 DataFrameReader
API?这很容易。对于这个问题,我想这一行就足够了。
df = spark.read.csv("myFile.csv") # By default, quote char is " and separator is ','
有了这个 API,您还可以尝试使用其他一些参数,例如 header 行,忽略前导和尾随空格。这是link:DataFrameReader API
如果CSV文件结构总是有两列,在Scala上可以实现:
val struct = StructType(
StructField("firstCol", StringType, nullable = true) ::
StructField("secondCol", StringType, nullable = true) :: Nil)
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "false")
.option("inferSchema", "false")
.option("delimiter", ",")
.option("quote", "\"")
.schema(struct)
.load("myFile.csv")
df.show(false)
val indexed = df.withColumn("index", monotonicallyIncreasingId())
val filtered = indexed.filter(col("index") > 2).drop("index")
filtered.show(false)
结果是:
+---------+---------+
|firstCol |secondCol|
+---------+---------+
|Header |null |
|Blank Row|null |
|Col1 |Col2 |
|1,200 |1,456 |
|2,000 |3,450 |
+---------+---------+
+--------+---------+
|firstCol|secondCol|
+--------+---------+
|1,200 |1,456 |
|2,000 |3,450 |
+--------+---------+
Zlidime 的回答很正确。工作解决方案是这样的:
import csv
customSchema = StructType([ \
StructField("Col1", StringType(), True), \
StructField("Col2", StringType(), True)])
df = sc.textFile("file.csv")\
.mapPartitions(lambda partition: csv.reader([line.replace('[=10=]','') for line in partition],delimiter=',', quotechar='"')).filter(lambda line: len(line) > 2 and line[0] != 'Col1')\
.toDF(customSchema)
我有一个结构如下的 CSV 文件:
Header
Blank Row
"Col1","Col2"
"1,200","1,456"
"2,000","3,450"
我在阅读这个文件时遇到了两个问题。
- 我想忽略 Header 并忽略空白行
- 值中的逗号不是分隔符
这是我尝试过的:
df = sc.textFile("myFile.csv")\
.map(lambda line: line.split(","))\ #Split By comma
.filter(lambda line: len(line) == 2).collect() #This helped me ignore the first two rows
但是,这不起作用,因为值中的逗号被读取为分隔符并且 len(line)
返回 4 而不是 2。
我尝试了另一种方法:
data = sc.textFile("myFile.csv")
headers = data.take(2) #First two rows to be skipped
当时的想法是使用过滤器而不是阅读 headers。但是,当我尝试打印 headers 时,我得到了编码值。
[\x00A\x00Y\x00 \x00J\x00u\x00l\x00y\x00 \x002\x000\x001\x006\x00]
读取 CSV 文件并跳过前两行的正确方法是什么?
尝试使用 csv.reader 和 'quotechar' parameter.It 将正确拆分行。 之后您可以根据需要添加过滤器。
import csv
from pyspark.sql.types import StringType
df = sc.textFile("test2.csv")\
.mapPartitions(lambda line: csv.reader(line,delimiter=',', quotechar='"')).filter(lambda line: len(line)>=2 and line[0]!= 'Col1')\
.toDF(['Col1','Col2'])
对于您的第一个问题,只需使用 zipWithIndex
压缩 RDD 中的行并过滤您不需要的行。
对于第二个问题,您可以尝试从行中删除第一个和最后一个双引号字符,然后在 ","
.
rdd = sc.textFile("myfile.csv")
rdd.zipWithIndex().
filter(lambda x: x[1] > 2).
map(lambda x: x[0]).
map(lambda x: x.strip('"').split('","')).
toDF(["Col1", "Col2"])
尽管如此,如果您正在寻找在 Spark 中处理 CSV 文件的标准方法,最好使用来自 databricks 的 spark-csv
包。
您为什么不试试 pyspark.sql
中的 DataFrameReader
API?这很容易。对于这个问题,我想这一行就足够了。
df = spark.read.csv("myFile.csv") # By default, quote char is " and separator is ','
有了这个 API,您还可以尝试使用其他一些参数,例如 header 行,忽略前导和尾随空格。这是link:DataFrameReader API
如果CSV文件结构总是有两列,在Scala上可以实现:
val struct = StructType(
StructField("firstCol", StringType, nullable = true) ::
StructField("secondCol", StringType, nullable = true) :: Nil)
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "false")
.option("inferSchema", "false")
.option("delimiter", ",")
.option("quote", "\"")
.schema(struct)
.load("myFile.csv")
df.show(false)
val indexed = df.withColumn("index", monotonicallyIncreasingId())
val filtered = indexed.filter(col("index") > 2).drop("index")
filtered.show(false)
结果是:
+---------+---------+
|firstCol |secondCol|
+---------+---------+
|Header |null |
|Blank Row|null |
|Col1 |Col2 |
|1,200 |1,456 |
|2,000 |3,450 |
+---------+---------+
+--------+---------+
|firstCol|secondCol|
+--------+---------+
|1,200 |1,456 |
|2,000 |3,450 |
+--------+---------+
Zlidime 的回答很正确。工作解决方案是这样的:
import csv
customSchema = StructType([ \
StructField("Col1", StringType(), True), \
StructField("Col2", StringType(), True)])
df = sc.textFile("file.csv")\
.mapPartitions(lambda partition: csv.reader([line.replace('[=10=]','') for line in partition],delimiter=',', quotechar='"')).filter(lambda line: len(line) > 2 and line[0] != 'Col1')\
.toDF(customSchema)