Spark Dataframe 未在管道“|”上拆分对于日志文件
Spark Dataframe not splitting on pipe "|" for log files
在提出 Streaming Spark ETL 过程之前,我对 Spark 还很陌生,并尝试进行实验。我有一个没有这种格式的列 headers 的日志文件 -
2019-08-02 00:25:59,116|10.29.2.5||unknown|||0|0|0|250|250|272||400|71b8fde0-5f8e-14a9-8d12-54ab3a911327|0|InvalidBucketName|
2019-08-02 00:26:00,302|10.29.2.6||unknown|||0|0|0|250|250|197||400|71b8fde2-5f8e-14a9-8d12-54ab3a911327|0|InvalidBucketName|
2019-08-02 00:26:04,142|10.29.2.5||unknown|||0|0|0|250|250|285||400|71b8fde4-5f8e-14a9-8d12-54ab3a911327|0|InvalidBucketName|
2019-08-02 00:26:04,254|10.1.198.43|splunk|getObject|splunk|splunk|160|0|200|696|1056|15875|ad%2Fdma%2Fe6%2F8f%2F464%7E1B6859C4-4A99-4DE3-B80D-2DC893813BFD%2F5C218CC9-77D3-4BBE-AD24-A5FB31CC56F2_DM_SplunkforPaloAltoNetworks_pan_traps%2Freceipt.json|200|71b8fde6-5f8e-14a9-8d12-54ab3a911327|64625d8ce9070fa1d91d9da44c8ce8c6|0|
2019-08-02 00:26:04,259|10.1.198.47|splunk|getObject|splunk|splunk|160|0|200|700|1060|10366|ad%2Fdma%2F6d%2F70%2F461%7E827F7E6F-7508-4E30-87A9-8BBDB5AFA944%2F5C218CC9-77D3-4BBE-AD24-A5FB31CC56F2_DM_SplunkforPaloAltoNetworks_pan_firewall%2Freceipt.json|200|71b8fde8-5f8e-14a9-8d12-54ab3a911327|11afe1e144bb29dcb14fba199fa000de|0|
我正在使用 read.text()
方法获取数据帧(不确定使用 read.textFile() 返回数据集是否有任何优势)-
val logRecordDF = spark.read.option("delimiter", "|").textFile("/Users/atekade/Desktop/abc-logs/abc-request-info.log.2019-08-02.1")
但我得到的数据集是这样的 -
[2019-08-02 00:00:00,359|10.29.2.6||unknown|||0|0|0|250|250|246||400|71b8f902-5f8e-14a9-8d12-54ab3a911327|0|InvalidBucketName|]
[2019-08-02 00:00:04,129|10.29.2.5||unknown|||0|0|0|250|250|279||400|71b8f904-5f8e-14a9-8d12-54ab3a911327|0|InvalidBucketName|]
[2019-08-02 00:00:05,346|10.29.2.6||unknown|||0|0|0|250|250|226||400|71b8f906-5f8e-14a9-8d12-54ab3a911327|0|InvalidBucketName|]
[2019-08-02 00:00:09,144|10.29.2.5||unknown|||0|0|0|250|250|267||400|71b8f908-5f8e-14a9-8d12-54ab3a911327|0|InvalidBucketName|]
我尝试添加以下拆分组合,但其中 none 为我提供了所需的格式化数据 -
- 选项("sep", "[|]")
- 选项("sep", "|")
- 选项("sep", "\|")
- 和 "delimiter" 的相同组合。
我在这里做错了什么?只是为了在此处提供更多上下文,我的数据没有列名,因此我创建了一个带有 StructType 的 customSchema,并尝试在创建数据框时将其与 schema()
方法一起使用,但两者都不起作用。是否有阅读 API 的特定要求?
而不是读取文件 尝试使用 .csv
api 读取文件(使用一些分隔符拆分文件)并将您的模式添加到 dataframe
!
spark.read.option("sep","|").csv("<file_path>").show(false)
Example:
val str=Seq(("2019-08-02 00:25:59,116|10.29.2.5||unknown|||0|0|0|250|250|272||400|71b8fde0-5f8e-14a9-8d12-54ab3a911327|0|InvalidBucketName|")).toDS
spark.read.option("sep","|").csv(str).show(false)
//(or)
spark.read.option("delimiter","|").csv(str).show(false)
Result:
-----------------------+---------+----+-------+----+----+---+---+---+---+----+----+----+----+------------------------------------+----+-----------------+----+
|_c0 |_c1 |_c2 |_c3 |_c4 |_c5 |_c6|_c7|_c8|_c9|_c10|_c11|_c12|_c13|_c14 |_c15|_c16 |_c17|
+-----------------------+---------+----+-------+----+----+---+---+---+---+----+----+----+----+------------------------------------+----+-----------------+----+
|2019-08-02 00:25:59,116|10.29.2.5|null|unknown|null|null|0 |0 |0 |250|250 |272 |null|400 |71b8fde0-5f8e-14a9-8d12-54ab3a911327|0 |InvalidBucketName|null|
+-----------------------+---------+----+-------+----+----+---+---+---+---+----+----+----+----+------------------------------------+----+-----------------+----+
spark.read.text 以单列读取文件,现在无法指定分隔符。它会读取 value 列中的文件。
您可以使用
spark.read.format("csv").option("delimiter", "|").load("/FileStore/tables/logfile.txt")
如果需要,您还可以指定 schema
它会将您的文件加载为
+-----------------------+-----------+------+---------+------+------+---+---+---+---+----+-----+---------------------------------------------------------------------------------------------------------------------------------------------------------------+----+------------------------------------+--------------------------------+-----------------+----+
|_c0 |_c1 |_c2 |_c3 |_c4 |_c5 |_c6|_c7|_c8|_c9|_c10|_c11 |_c12 |_c13|_c14 |_c15 |_c16 |_c17|
+-----------------------+-----------+------+---------+------+------+---+---+---+---+----+-----+---------------------------------------------------------------------------------------------------------------------------------------------------------------+----+------------------------------------+--------------------------------+-----------------+----+
|2019-08-02 00:25:59,116|10.29.2.5 |null |unknown |null |null |0 |0 |0 |250|250 |272 |null |400 |71b8fde0-5f8e-14a9-8d12-54ab3a911327|0 |InvalidBucketName|null|
|2019-08-02 00:26:00,302|10.29.2.6 |null |unknown |null |null |0 |0 |0 |250|250 |197 |null |400 |71b8fde2-5f8e-14a9-8d12-54ab3a911327|0 |InvalidBucketName|null|
|2019-08-02 00:26:04,142|10.29.2.5 |null |unknown |null |null |0 |0 |0 |250|250 |285 |null |400 |71b8fde4-5f8e-14a9-8d12-54ab3a911327|0 |InvalidBucketName|null|
+-----------------------+-----------+------+---------+------+------+---+---+---+---+----+-----+---------------------------------------------------------------------------------------------------------------------------------------------------------------+----+------------------------------------+--------------------------------+-----------------+----+
我使用了以下方法。感谢@Sagar 和@Shu 提供的信息,我需要使用 csv API 而不是文本来阅读。
创建自定义架构 -
val schemaString = "dttm|ip|bktownr|oper|bktnm|usr|" +
"reqhdr|reqbd|reshdr|resbd|totsize| " +
"duration|objnm|httpstts|s3reqid|etag|errcd|srcbkt"
val customSchema = StructType(
schemaString
.split('|')
.map(fieldName => StructField(fieldName,StringType,true))
)
我有这样的架构 -
customSchema: org.apache.spark.sql.types.StructType = StructType(StructField(dttm,StringType,true), StructField(ip,StringType,true), StructField(bktownr,StringType,true), StructField(oper,StringType,true), StructField(bktnm,StringType,true), StructField(usr,StringType,true), StructField(reqhdr,StringType,true), StructField(reqbd,StringType,true), StructField(reshdr,StringType,true), StructField(resbd,StringType,true), StructField(totsize,StringType,true), StructField( duration,StringType,true), StructField(objnm,StringType,true), StructField(httpstts,StringType,true), StructField(s3reqid,StringType,true), StructField(etag,StringType,true), StructField(errcd,StringType,true), StructField(srcbkt,StringType,true))
我不能这样阅读文件 -
val reqLogDF:DataFrame = spark
.read
.format("csv")
.option("header", "true")
.option("delimiter", "|")
.schema(customSchema)
.load("/Users/atekade/Desktop/cloudian-logs/cloudian-request-info.log.2019-08-11.1")
.toDF
reqLogDF.collect.foreach(println)
-
的输出
[2019-08-11 00:00:03,002,10.1.198.42,splunk,getObject,splunk,splunk,160,0,55,246,461,2418,aws%2Fra%2Fdd%2F40%2F49%7E2D3930C7-6EC3-4134-8CF6-EED2B577A63B%2FB2CD75CD-EF9C-4844-BD2E-22805FEB53AA_splunk_app_aws_nobody_NSa7cd87ffa1c278dc%2Freceipt.json,404,4a4adcf6-f65b-14aa-8d12-54ab3a911327,0,NoSuchKey,null]
[2019-08-11 00:00:03,034,10.1.198.42,splunk,headObject,splunk,splunk,160,0,55,246,461,1523,aws%2Fra%2Fdd%2F40%2F49%7E2D3930C7-6EC3-4134-8CF6-EED2B577A63B%2FB2CD75CD-EF9C-4844-BD2E-22805FEB53AA_splunk_app_aws_nobody_NSa7cd87ffa1c278dc%2Freceipt.json,404,4a4adcf8-f65b-14aa-8d12-54ab3a911327,0,NoSuchKey,null]
[2019-08-11 00:00:03,043,10.1.198.42,splunk,getObject,splunk,splunk,160,0,55,246,461,1837,aws%2Fra%2Ffe%2Fe1%2F47%7E2D3930C7-6EC3-4134-8CF6-EED2B577A63B%2FB2CD75CD-EF9C-4844-BD2E-22805FEB53AA_splunk_app_aws_nobody_NSa7cd87ffa1c278dc%2Freceipt.json,404,4a4adcfa-f65b-14aa-8d12-54ab3a911327,0,NoSuchKey,null]
[2019-08-11 00:00:03,095,10.1.198.42,splunk,headObject,splunk,splunk,160,0,55,246,461,1700,aws%2Fra%2Ffe%2Fe1%2F47%7E2D3930C7-6EC3-4134-8CF6-EED2B577A63B%2FB2CD75CD-EF9C-4844-BD2E-22805FEB53AA_splunk_app_aws_nobody_NSa7cd87ffa1c278dc%2Freceipt.json,404,4a4adcfc-f65b-14aa-8d12-54ab3a911327,0,NoSuchKey,null]
[2019-08-11 00:00:03,550,10.29.2.5,null,unknown,null,null,0,0,0,250,250,26
在提出 Streaming Spark ETL 过程之前,我对 Spark 还很陌生,并尝试进行实验。我有一个没有这种格式的列 headers 的日志文件 -
2019-08-02 00:25:59,116|10.29.2.5||unknown|||0|0|0|250|250|272||400|71b8fde0-5f8e-14a9-8d12-54ab3a911327|0|InvalidBucketName|
2019-08-02 00:26:00,302|10.29.2.6||unknown|||0|0|0|250|250|197||400|71b8fde2-5f8e-14a9-8d12-54ab3a911327|0|InvalidBucketName|
2019-08-02 00:26:04,142|10.29.2.5||unknown|||0|0|0|250|250|285||400|71b8fde4-5f8e-14a9-8d12-54ab3a911327|0|InvalidBucketName|
2019-08-02 00:26:04,254|10.1.198.43|splunk|getObject|splunk|splunk|160|0|200|696|1056|15875|ad%2Fdma%2Fe6%2F8f%2F464%7E1B6859C4-4A99-4DE3-B80D-2DC893813BFD%2F5C218CC9-77D3-4BBE-AD24-A5FB31CC56F2_DM_SplunkforPaloAltoNetworks_pan_traps%2Freceipt.json|200|71b8fde6-5f8e-14a9-8d12-54ab3a911327|64625d8ce9070fa1d91d9da44c8ce8c6|0|
2019-08-02 00:26:04,259|10.1.198.47|splunk|getObject|splunk|splunk|160|0|200|700|1060|10366|ad%2Fdma%2F6d%2F70%2F461%7E827F7E6F-7508-4E30-87A9-8BBDB5AFA944%2F5C218CC9-77D3-4BBE-AD24-A5FB31CC56F2_DM_SplunkforPaloAltoNetworks_pan_firewall%2Freceipt.json|200|71b8fde8-5f8e-14a9-8d12-54ab3a911327|11afe1e144bb29dcb14fba199fa000de|0|
我正在使用 read.text()
方法获取数据帧(不确定使用 read.textFile() 返回数据集是否有任何优势)-
val logRecordDF = spark.read.option("delimiter", "|").textFile("/Users/atekade/Desktop/abc-logs/abc-request-info.log.2019-08-02.1")
但我得到的数据集是这样的 -
[2019-08-02 00:00:00,359|10.29.2.6||unknown|||0|0|0|250|250|246||400|71b8f902-5f8e-14a9-8d12-54ab3a911327|0|InvalidBucketName|]
[2019-08-02 00:00:04,129|10.29.2.5||unknown|||0|0|0|250|250|279||400|71b8f904-5f8e-14a9-8d12-54ab3a911327|0|InvalidBucketName|]
[2019-08-02 00:00:05,346|10.29.2.6||unknown|||0|0|0|250|250|226||400|71b8f906-5f8e-14a9-8d12-54ab3a911327|0|InvalidBucketName|]
[2019-08-02 00:00:09,144|10.29.2.5||unknown|||0|0|0|250|250|267||400|71b8f908-5f8e-14a9-8d12-54ab3a911327|0|InvalidBucketName|]
我尝试添加以下拆分组合,但其中 none 为我提供了所需的格式化数据 -
- 选项("sep", "[|]")
- 选项("sep", "|")
- 选项("sep", "\|")
- 和 "delimiter" 的相同组合。
我在这里做错了什么?只是为了在此处提供更多上下文,我的数据没有列名,因此我创建了一个带有 StructType 的 customSchema,并尝试在创建数据框时将其与 schema()
方法一起使用,但两者都不起作用。是否有阅读 API 的特定要求?
而不是读取文件 .csv
api 读取文件(使用一些分隔符拆分文件)并将您的模式添加到 dataframe
!
spark.read.option("sep","|").csv("<file_path>").show(false)
Example:
val str=Seq(("2019-08-02 00:25:59,116|10.29.2.5||unknown|||0|0|0|250|250|272||400|71b8fde0-5f8e-14a9-8d12-54ab3a911327|0|InvalidBucketName|")).toDS
spark.read.option("sep","|").csv(str).show(false)
//(or)
spark.read.option("delimiter","|").csv(str).show(false)
Result:
-----------------------+---------+----+-------+----+----+---+---+---+---+----+----+----+----+------------------------------------+----+-----------------+----+
|_c0 |_c1 |_c2 |_c3 |_c4 |_c5 |_c6|_c7|_c8|_c9|_c10|_c11|_c12|_c13|_c14 |_c15|_c16 |_c17|
+-----------------------+---------+----+-------+----+----+---+---+---+---+----+----+----+----+------------------------------------+----+-----------------+----+
|2019-08-02 00:25:59,116|10.29.2.5|null|unknown|null|null|0 |0 |0 |250|250 |272 |null|400 |71b8fde0-5f8e-14a9-8d12-54ab3a911327|0 |InvalidBucketName|null|
+-----------------------+---------+----+-------+----+----+---+---+---+---+----+----+----+----+------------------------------------+----+-----------------+----+
spark.read.text 以单列读取文件,现在无法指定分隔符。它会读取 value 列中的文件。
您可以使用
spark.read.format("csv").option("delimiter", "|").load("/FileStore/tables/logfile.txt")
如果需要,您还可以指定 schema
它会将您的文件加载为
+-----------------------+-----------+------+---------+------+------+---+---+---+---+----+-----+---------------------------------------------------------------------------------------------------------------------------------------------------------------+----+------------------------------------+--------------------------------+-----------------+----+
|_c0 |_c1 |_c2 |_c3 |_c4 |_c5 |_c6|_c7|_c8|_c9|_c10|_c11 |_c12 |_c13|_c14 |_c15 |_c16 |_c17|
+-----------------------+-----------+------+---------+------+------+---+---+---+---+----+-----+---------------------------------------------------------------------------------------------------------------------------------------------------------------+----+------------------------------------+--------------------------------+-----------------+----+
|2019-08-02 00:25:59,116|10.29.2.5 |null |unknown |null |null |0 |0 |0 |250|250 |272 |null |400 |71b8fde0-5f8e-14a9-8d12-54ab3a911327|0 |InvalidBucketName|null|
|2019-08-02 00:26:00,302|10.29.2.6 |null |unknown |null |null |0 |0 |0 |250|250 |197 |null |400 |71b8fde2-5f8e-14a9-8d12-54ab3a911327|0 |InvalidBucketName|null|
|2019-08-02 00:26:04,142|10.29.2.5 |null |unknown |null |null |0 |0 |0 |250|250 |285 |null |400 |71b8fde4-5f8e-14a9-8d12-54ab3a911327|0 |InvalidBucketName|null|
+-----------------------+-----------+------+---------+------+------+---+---+---+---+----+-----+---------------------------------------------------------------------------------------------------------------------------------------------------------------+----+------------------------------------+--------------------------------+-----------------+----+
我使用了以下方法。感谢@Sagar 和@Shu 提供的信息,我需要使用 csv API 而不是文本来阅读。
创建自定义架构 -
val schemaString = "dttm|ip|bktownr|oper|bktnm|usr|" +
"reqhdr|reqbd|reshdr|resbd|totsize| " +
"duration|objnm|httpstts|s3reqid|etag|errcd|srcbkt"
val customSchema = StructType(
schemaString
.split('|')
.map(fieldName => StructField(fieldName,StringType,true))
)
我有这样的架构 -
customSchema: org.apache.spark.sql.types.StructType = StructType(StructField(dttm,StringType,true), StructField(ip,StringType,true), StructField(bktownr,StringType,true), StructField(oper,StringType,true), StructField(bktnm,StringType,true), StructField(usr,StringType,true), StructField(reqhdr,StringType,true), StructField(reqbd,StringType,true), StructField(reshdr,StringType,true), StructField(resbd,StringType,true), StructField(totsize,StringType,true), StructField( duration,StringType,true), StructField(objnm,StringType,true), StructField(httpstts,StringType,true), StructField(s3reqid,StringType,true), StructField(etag,StringType,true), StructField(errcd,StringType,true), StructField(srcbkt,StringType,true))
我不能这样阅读文件 -
val reqLogDF:DataFrame = spark
.read
.format("csv")
.option("header", "true")
.option("delimiter", "|")
.schema(customSchema)
.load("/Users/atekade/Desktop/cloudian-logs/cloudian-request-info.log.2019-08-11.1")
.toDF
reqLogDF.collect.foreach(println)
-
[2019-08-11 00:00:03,002,10.1.198.42,splunk,getObject,splunk,splunk,160,0,55,246,461,2418,aws%2Fra%2Fdd%2F40%2F49%7E2D3930C7-6EC3-4134-8CF6-EED2B577A63B%2FB2CD75CD-EF9C-4844-BD2E-22805FEB53AA_splunk_app_aws_nobody_NSa7cd87ffa1c278dc%2Freceipt.json,404,4a4adcf6-f65b-14aa-8d12-54ab3a911327,0,NoSuchKey,null]
[2019-08-11 00:00:03,034,10.1.198.42,splunk,headObject,splunk,splunk,160,0,55,246,461,1523,aws%2Fra%2Fdd%2F40%2F49%7E2D3930C7-6EC3-4134-8CF6-EED2B577A63B%2FB2CD75CD-EF9C-4844-BD2E-22805FEB53AA_splunk_app_aws_nobody_NSa7cd87ffa1c278dc%2Freceipt.json,404,4a4adcf8-f65b-14aa-8d12-54ab3a911327,0,NoSuchKey,null]
[2019-08-11 00:00:03,043,10.1.198.42,splunk,getObject,splunk,splunk,160,0,55,246,461,1837,aws%2Fra%2Ffe%2Fe1%2F47%7E2D3930C7-6EC3-4134-8CF6-EED2B577A63B%2FB2CD75CD-EF9C-4844-BD2E-22805FEB53AA_splunk_app_aws_nobody_NSa7cd87ffa1c278dc%2Freceipt.json,404,4a4adcfa-f65b-14aa-8d12-54ab3a911327,0,NoSuchKey,null]
[2019-08-11 00:00:03,095,10.1.198.42,splunk,headObject,splunk,splunk,160,0,55,246,461,1700,aws%2Fra%2Ffe%2Fe1%2F47%7E2D3930C7-6EC3-4134-8CF6-EED2B577A63B%2FB2CD75CD-EF9C-4844-BD2E-22805FEB53AA_splunk_app_aws_nobody_NSa7cd87ffa1c278dc%2Freceipt.json,404,4a4adcfc-f65b-14aa-8d12-54ab3a911327,0,NoSuchKey,null]
[2019-08-11 00:00:03,550,10.29.2.5,null,unknown,null,null,0,0,0,250,250,26