Databricks pyspark,使用 header='false' 时 Dataframe.count() 和 Display(Dataframe) 的结果差异

Databricks pyspark, Difference in result of Dataframe.count() and Display(Dataframe) while using header='false'

我正在通过以下代码读取数据框中的 CSV(存在于 Azure 数据湖存储中)文件:

df = spark.read.load(filepath, format="csv", schema = mySchema, header="false", mode="DROPMALFORMED");

文件 文件路径 包含 100 行和 header。我想在阅读时忽略文件中的 header 所以我定义了 header="false" 。 (有时文件带有 header,有时则没有)

当我通过 display(df) 语句显示数据帧时读取数据帧后,我得到了所有数据并显示了 100 行,这是正确的。但是当我过去使用 df.count() 检查数据帧的计数时,它显示了 101 行。数据框是否显示 header 的计数?还是我错过了什么?

mySchemafilepath 已在单元格中单独定义。

根据 pyspark 文档,

https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader

header – uses the first line as names of columns. If None is set, it uses the default value, false.

您可能需要标准化获取数据的方式,使用 header 或不使用 header,然后设置标志。

如果您设置 header=False,那么 Spark 引擎将只读取第一行作为数据行。

为了回答你的问题,Dataframe 计数不算 header。 我建议先读取数据,然后删除 headers 以进行调试。

此外,display(df) 是由 Ipython 提供的 python 操作,我会使用 dataframe.show() spark 提供的用于调试目的的实用程序。

您在读取 ​​csv 文件时 mode="DROPMALFORMED"

  • 当有一些格式错误的记录时,spark 将它们删除 in df.show() but counts them in df.count().
  • 在您的情况下,header 为假且指定了架构,因此 spark 会根据您指定的类型读取数据,如果存在问题,则不会显示记录

Example:

#sample data
#cat employee.csv
#id,name,salary,deptid
#1,a,1000,101
#2,b,2000,201

ss=StructType([StructField("id",IntegerType()),StructField("name",StringType()),StructField("salary",StringType()),StructField("deptid",StringType())])

df=spark.read.load("employee.csv",format="csv",schema=ss,header="false",mode="DROPMALFORMED")


df.show()
#+---+----+------+------+
#| id|name|salary|deptid|
#+---+----+------+------+
#|  1|   a|  1000|   101|
#|  2|   b|  2000|   201|
#+---+----+------+------+

#issue in df.count
df.count()
#3 #has to be 2

To Fix:

在作为数据帧读取时添加 notNull 过滤器。

df=spark.read.load("employee.csv",format="csv",schema=ss,header="false",mode="DROPMALFORMED").filter(col("id").isNotNull())

df.show()
#+---+----+------+------+
#| id|name|salary|deptid|
#+---+----+------+------+
#|  1|   a|  1000|   101|
#|  2|   b|  2000|   201|
#+---+----+------+------+

#fixed count
df.count()
#2

查看格式错误的数据删除模式:

spark.read.load("employee.csv",format="csv",schema= mySchema,header="false").show(100,False)