如何从 Dataframe 中删除 header 和页脚?
How to Remove header and footer from Dataframe?
我正在阅读包含 header、内容和页脚使用
的文本(非 CSV)文件
spark.read.format("text").option("delimiter","|")...load(file)
我可以使用 df.first()
访问 header。有接近df.last()
或df.reverse().first()
的东西吗?
假设文件不是很大,我们可以使用 collect 作为迭代器获取数据帧,并按如下方式访问最后一个元素:
df = df.collect()[data.count()-1]
避免在大型数据集上使用 collect
。
或
我们可以使用 take 截断最后一行。
df = df.take(data.count()-1)
示例数据:
col1|col2|col3
100|hello|asdf
300|hi|abc
200|bye|xyz
800|ciao|qwerty
This is the footer line
处理逻辑:
#load text file
txt = sc.textFile("path_to_above_sample_data_text_file.txt")
#remove header
header = txt.first()
txt = txt.filter(lambda line: line != header)
#remove footer
txt = txt.map(lambda line: line.split("|"))\
.filter(lambda line: len(line)>1)
#convert to dataframe
df=txt.toDF(header.split("|"))
df.show()
输出为:
+----+-----+------+
|col1| col2| col3|
+----+-----+------+
| 100|hello| asdf|
| 300| hi| abc|
| 200| bye| xyz|
| 800| ciao|qwerty|
+----+-----+------+
除上述答案外,solution fits good
下方包含 multiple
header
和 footer
行的文件:-
val data_delimiter = "|"
val skipHeaderLines = 5
val skipHeaderLines = 3
//-- Read file into Dataframe and convert to RDD
val dataframe = spark.read.option("wholeFile", true).option("delimiter",data_delimiter).csv(s"hdfs://$in_data_file")
val rdd = dataframe.rdd
//-- RDD without header and footer
val dfRdd = rdd.zipWithIndex().filter({case (line, index) => index != (cnt - skipFooterLines) && index > (skipHeaderLines - 1)}).map({case (line, index) => line})
//-- Dataframe without header and footer
val df = spark.createDataFrame(dfRdd, dataframe.schema)
假设您的文本文件有 JSON header 和页脚,
火花 SQL 方式,
示例数据
{"":[{<field_name>:<field_value1>},{<field_name>:<field_value2>}]}
这里可以通过以下 3 行(假设数据中没有 Tilda)来避免 header,
jsonToCsvDF=spark.read.format("com.databricks.spark.csv").option("delimiter", "~").load(<Blob Path1/ ADLS Path1>)
jsonToCsvDF.createOrReplaceTempView("json_to_csv")
spark.sql("SELECT SUBSTR(`_c0`,5,length(`_c0`)-5) FROM json_to_csv").coalesce(1).write.option("header",false).mode("overwrite").text(<Blob Path2/ ADLS Path2>)
现在输出看起来像,
[{<field_name>:<field_value1>},{<field_name>:<field_value2>}]
我正在阅读包含 header、内容和页脚使用
的文本(非 CSV)文件spark.read.format("text").option("delimiter","|")...load(file)
我可以使用 df.first()
访问 header。有接近df.last()
或df.reverse().first()
的东西吗?
假设文件不是很大,我们可以使用 collect 作为迭代器获取数据帧,并按如下方式访问最后一个元素:
df = df.collect()[data.count()-1]
避免在大型数据集上使用 collect
。
或
我们可以使用 take 截断最后一行。
df = df.take(data.count()-1)
示例数据:
col1|col2|col3
100|hello|asdf
300|hi|abc
200|bye|xyz
800|ciao|qwerty
This is the footer line
处理逻辑:
#load text file
txt = sc.textFile("path_to_above_sample_data_text_file.txt")
#remove header
header = txt.first()
txt = txt.filter(lambda line: line != header)
#remove footer
txt = txt.map(lambda line: line.split("|"))\
.filter(lambda line: len(line)>1)
#convert to dataframe
df=txt.toDF(header.split("|"))
df.show()
输出为:
+----+-----+------+
|col1| col2| col3|
+----+-----+------+
| 100|hello| asdf|
| 300| hi| abc|
| 200| bye| xyz|
| 800| ciao|qwerty|
+----+-----+------+
除上述答案外,solution fits good
下方包含 multiple
header
和 footer
行的文件:-
val data_delimiter = "|"
val skipHeaderLines = 5
val skipHeaderLines = 3
//-- Read file into Dataframe and convert to RDD
val dataframe = spark.read.option("wholeFile", true).option("delimiter",data_delimiter).csv(s"hdfs://$in_data_file")
val rdd = dataframe.rdd
//-- RDD without header and footer
val dfRdd = rdd.zipWithIndex().filter({case (line, index) => index != (cnt - skipFooterLines) && index > (skipHeaderLines - 1)}).map({case (line, index) => line})
//-- Dataframe without header and footer
val df = spark.createDataFrame(dfRdd, dataframe.schema)
假设您的文本文件有 JSON header 和页脚, 火花 SQL 方式,
示例数据
{"":[{<field_name>:<field_value1>},{<field_name>:<field_value2>}]}
这里可以通过以下 3 行(假设数据中没有 Tilda)来避免 header,
jsonToCsvDF=spark.read.format("com.databricks.spark.csv").option("delimiter", "~").load(<Blob Path1/ ADLS Path1>)
jsonToCsvDF.createOrReplaceTempView("json_to_csv")
spark.sql("SELECT SUBSTR(`_c0`,5,length(`_c0`)-5) FROM json_to_csv").coalesce(1).write.option("header",false).mode("overwrite").text(<Blob Path2/ ADLS Path2>)
现在输出看起来像,
[{<field_name>:<field_value1>},{<field_name>:<field_value2>}]