为什么在使用模式查询时所有字段都为空?

Why are all fields null when querying with schema?

我正在使用结构化流式传输,其模式在案例 class 和编码器的帮助下指定,以获取流式数据帧。

import java.sql.Timestamp
case class SampleLogEntry(
  dateTime: Timestamp,
  clientIp: String,
  userId: String,
  operation: String,
  bucketName: String,
  contAccUsrId: String,
  reqHeader: Integer,
  reqBody: Integer,
  respHeader: Integer,
  respBody: Integer,
  totalReqResSize: Integer,
  duration: Integer,
  objectName: String,
  httpStatus: Integer,
  s3ReqId: String,
  etag: String,
  errCode: Integer,
  srcBucket: String
)

import org.apache.spark.sql.Encoders
val sampleLogSchema = Encoders.product[SampleLogEntry].schema

val rawData = spark
  .readStream
  .format("csv")
  .option("delimiter", "|")
  .option("header", true)
  .schema(sampleLogSchema)
  .load("/Users/home/learning-spark/logs")

但是,我只得到 null 具有此架构的值:

-------------------------------------------
Batch: 0
-------------------------------------------
+--------+----+------+-----+----------+------------+---------+-------+----------+--------+---------+--------+----------+----------+--------+---------+-------+---------+
|dateTime|  IP|userId|s3Api|bucketName|accessUserId|reqHeader|reqBody|respHeader|respBody|totalSize|duration|objectName|httpStatus|reqestId|objectTag|errCode|srcBucket|
+--------+----+------+-----+----------+------------+---------+-------+----------+--------+---------+--------+----------+----------+--------+---------+-------+---------+
|    null|null|  null| null|      null|        null|     null|   null|      null|    null|     null|    null|      null|      null|    null|     null|   null|     null|
|    null|null|  null| null|      null|        null|     null|   null|      null|    null|     null|    null|      null|      null|    null|     null|   null|     null|
|    null|null|  null| null|      null|        null|     null|   null|      null|    null|     null|    null|      null|      null|    null|     null|   null|     null|
|    null|null|  null| null|      null|        null|     null|   null|      null|    null|     null|    null|      null|      null|    null|     null|   null|     null|

当我将此模式中的每个字段更改为 String 时,它工作正常,并获得以下预期输出。为什么?

+--------------------+---------+------+-------+----------+------------+---------+-------+----------+--------+---------+--------+----------+----------+--------------------+---------+-----------------+---------+
|            dateTime|       IP|userId|  s3Api|bucketName|accessUserId|reqHeader|reqBody|respHeader|respBody|totalSize|duration|objectName|httpStatus|            reqestId|objectTag|          errCode|srcBucket|
+--------------------+---------+------+-------+----------+------------+---------+-------+----------+--------+---------+--------+----------+----------+--------------------+---------+-----------------+---------+
|2019-07-18 00:00:...|10.29.2.5|  null|unknown|      null|        null|        0|      0|         0|     250|      250|     247|      null|       400|08084d90-299e-14a...|        0|InvalidBucketName|     null|
|2019-07-18 00:00:...|10.29.2.6|  null|unknown|      null|        null|        0|      0|         0|     250|      250|     291|      null|       400|08084d92-299e-14a...|        0|InvalidBucketName|     null|
|2019-07-18 00:00:...|10.29.2.5|  null|unknown|      null|        null|        0|      0|         0|     250|      250|     246|      null|       400|08084d94-299e-14a...|        0|InvalidBucketName|     null|
|2019-07-18 00:00:...|10.29.2.6|  null|unknown|      null|        null|        0|      0|         0|     250|      250|     227|      null|       400|08084d96-299e-14a...|        0|InvalidBucketName|     null|
|2019-07-18 00:00:...|10.29.2.5|  null|unknown|      null|        null|        0|      0|         0|     250|      250|     287|      null|       400|08084d98-299e-14a...|        0|InvalidBucketName|     null|

我的数据样本 -

2019-07-22 00:10:15,030|10.29.2.6||unknown|||0|0|0|250|250|251||400|05591428-86b7-14a8-8d12-54ab3a911327|0|InvalidBucketName|
2019-07-22 00:10:15,334|10.1.198.41|splunk|putObject|splunk|splunk|216|75|96|0|387|14117|aws%2Fdma%2F10%2F1b%2F32%7E6A3CD05A-AA46-47CF-B2E0-AA16F08FB944%2F5C218CC9-77D3-4BBE-AD24-A5FB31CC56F2_DM_splunk_app_aws_Detailed_Billing%2FguidSplunk-6A3CD05A-AA46-47CF-B2E0-AA16F08FB944%2Fmetadata_checksum|200|0559142a-86b7-14a8-8d12-54ab3a911327|73d2834dffe4f89007d892f20326e0e6|0|
2019-07-22 00:10:15,400|10.1.198.41|splunk|putObject|splunk|splunk|217|689|96|0|1002|27898|aws%2Fdma%2F10%2F1b%2F32%7E6A3CD05A-AA46-47CF-B2E0-AA16F08FB944%2F5C218CC9-77D3-4BBE-AD24-A5FB31CC56F2_DM_splunk_app_aws_S3_Access_Log%2Freceipt.json|200|0559142c-86b7-14a8-8d12-54ab3a911327|3214c88c08b9d96b085a8ca5921827e6|0|
2019-07-22 00:10:15,511|10.1.198.41|splunk|putObject|splunk|splunk|217|697|96|0|1010|5483|aws%2Fdma%2F10%2F1b%2F32%7E6A3CD05A-AA46-47CF-B2E0-AA16F08FB944%2F5C218CC9-77D3-4BBE-AD24-A5FB31CC56F2_DM_splunk_app_aws_CloudFront_Access_Log%2Freceipt.json|200|0559142e-86b7-14a8-8d12-54ab3a911327|659d8a9f6b899755e03b709c069016cd|0|
2019-07-22 00:10:16,911|10.1.198.41|splunk|putObject|splunk|splunk|216|75|96|0|387|7295|aws%2Fdma%2F10%2F1b%2F32%7E6A3CD05A-AA46-47CF-B2E0-AA16F08FB944%2F5C218CC9-77D3-4BBE-AD24-A5FB31CC56F2_DM_SplunkforPaloAltoNetworks_pan_wildfire_report%2FguidSplunk-6A3CD05A-AA46-47CF-B2E0-AA16F08FB944%2Fmetadata_checksum|200|05591430-86b7-14a8-8d12-54ab3a911327|bf6873f745bc661bc940b7f13a00c314|0|
2019-07-22 00:10:16,952|10.1.198.41|splunk|putObject|splunk|splunk|217|696|96|0|1009|5206|aws%2Fdma%2F10%2F1b%2F32%7E6A3CD05A-AA46-47CF-B2E0-AA16F08FB944%2F5C218CC9-77D3-4BBE-AD24-A5FB31CC56F2_DM_SplunkforPaloAltoNetworks_pan_traps%2Freceipt.json|200|05591432-86b7-14a8-8d12-54ab3a911327|e06d1193d3ed2503d6e378630cf6003b|0|
2019-07-22 00:10:17,263|10.1.198.41|splunk|putObject|splunk|splunk|217|699|96|0|1012|5142|aws%2Fdma%2F10%2F1b%2F32%7E6A3CD05A-AA46-47CF-B2E0-AA16F08FB944%2F5C218CC9-77D3-4BBE-AD24-A5FB31CC56F2_DM_SplunkforPaloAltoNetworks_pan_aperture%2Freceipt.json|200|05591434-86b7-14a8-8d12-54ab3a911327|97acdf7a24b733c011aa7fad0637b526|0|
2019-07-22 00:10:17,540|10.1.198.41|splunk|putObject|splunk|splunk|219|20787|96|0|21102|6861|aws%2Fdma%2F10%2F1b%2F32%7E6A3CD05A-AA46-47CF-B2E0-AA16F08FB944%2F5C218CC9-77D3-4BBE-AD24-A5FB31CC56F2_DM_SplunkforPaloAltoNetworks_pan_firewall%2FguidSplunk-6A3CD05A-AA46-47CF-B2E0-AA16F08FB944%2Fmetadata.csv|200|05591436-86b7-14a8-8d12-54ab3a911327|2bc8b90fdb1b5b156cf864005ca6c060|0|
2019-07-22 00:10:17,591|10.1.198.41|splunk|putObject|splunk|splunk|217|783|96|0|1096|5074|aws%2Fra%2F10%2F1b%2F32%7E6A3CD05A-AA46-47CF-B2E0-AA16F08FB944%2F5C218CC9-77D3-4BBE-AD24-A5FB31CC56F2_splunk_app_aws_nobody_NSbbffede24e849f9b%2Freceipt.json|200|05591438-86b7-14a8-8d12-54ab3a911327|28da300a36bb1e054db7f60a8aad5b4a|0|
2019-07-22 00:10:17,826|10.29.2.5||unknown|||0|0|0|250|250|196||400|0559143a-86b7-14a8-8d12-54ab3a911327|0|InvalidBucketName|
2019-07-22 00:10:20,030|10.29.2.6||unknown|||0|0|0|250|250|272||400|0559143c-86b7-14a8-8d12-54ab3a911327|0|InvalidBucketName|

如何设置架构以便我可以从具有指定字段数据类型的日志文件中读取数据?

它对我来说很好用。

scala> import org.apache.spark.sql.Encoders
import org.apache.spark.sql.Encoders

scala> case class Person(id: Int, name: String)
defined class Person

scala> val data = List(("id|name"),("10|xyz")).toDS()
data: org.apache.spark.sql.Dataset[String] = [value: string]

scala> val delimited = spark.read.
     |       option("delimiter", "|").
     |       option("header", "true").
     |       schema(Encoders.product[Person].schema).
     |       csv(data)
delimited: org.apache.spark.sql.DataFrame = [id: int, name: string]

scala> delimited.printSchema
root
 |-- id: integer (nullable = false)
 |-- name: string (nullable = true)


scala> delimited.show(false)
+---+----+
|id |name|
+---+----+
|10 |xyz |
+---+----+

val rawData = spark .readStream .format("") .option("delimiter", "|") .option("header", "true") .schema(sampleLogSchema) .load("/Users/home/learning-spark/logs")

在您的代码中,您可以将格式选项更改为 csv

问题是 dateTime: Timestamp 字段。使用模式,您告诉 Spark SQL 使用时间戳感知解析器,该解析器执行字符串到时间戳的转换,但由于格式不正确而失败。没有模式,所有数据都是字符串,所以任何匹配都很好。

您必须使用 timestampFormat 选项指定正确的格式来解析类似 2019-07-22 00:10:15,030 的日期。它们根本不符合底层时间戳解析器的预期,因此所有值都得到 nulls.

option("timestampFormat", "yyyy-MM-dd HH:mm:ss,SSS")format("csv") 结合使用,structured/streaming 查询应该没问题。

p.s。这是 Spark SQL 中的一个更普遍的问题(并在 Spark Structured Streaming 中表现出来,它在幕后使用 Spark SQL 引擎执行)。

感谢 Jacek Laskowski 的回答。我将我的 datetime 字段更改为 java.sql.Timestamp 并在我的流式读取操作中包含了选项("timestampFormat"、"yyyy-MM-dd HH:mm:ss,SSS")。整体代码如下所示 -

import java.sql.Timestamp
case class SampleLogEntry(
  dateTime: Timestamp,
  clientIp: String,
  userId: String,
  operation: String,
  bucketName: String,
  contAccUsrId: String,
  reqHeader: Integer,
  reqBody: Integer,
  respHeader: Integer,
  respBody: Integer,
  totalReqResSize: Integer,
  duration: Integer,
  objectName: String,
  httpStatus: Integer,
  s3ReqId: String,
  etag: String,
  errCode: Integer,
  srcBucket: String
)

import org.apache.spark.sql.Encoders
val sampleLogSchema = Encoders.product[SampleLogEntry].schema

val rawData = spark
  .readStream
  .format("csv")
  .option("timestampFormat", "yyyy-MM-dd HH:mm:ss,SSS")
  .option("delimiter", "|")
  .option("header", true)
  .schema(sampleLogSchema)
  .load("/Users/home/learning-spark/logs")