Spark Structured streaming - 使用模式从文件中读取时间戳
Spark Structured streaming - reading timestamp from file using schema
我正在从事结构化流媒体工作。
我从文件中读取的数据包含时间戳(以毫秒为单位)、deviceId 和该设备报告的值。
多个设备报告数据。
我正在尝试编写一个作业,将所有设备发送的值汇总(求和)为 1 分钟的 windows。
我遇到的问题是时间戳。
当我尝试将“timestamp”解析为 Long 时,window 函数抱怨它需要“timestamp type”。
当我尝试按照下面的代码片段解析为 TimestampType 时,我得到了 .MatchError
异常(完整的异常可以在下面看到)并且我正在努力弄清楚为什么以及处理它的正确方法是什么
// Create schema
StructType readSchema = new StructType().add("value" , "integer")
.add("deviceId", "long")
.add("timestamp", new TimestampType());
// Read data from file
Dataset<Row> inputDataFrame = sparkSession.readStream()
.schema(readSchema)
.parquet(path);
Dataset<Row> aggregations = inputDataFrame.groupBy(window(inputDataFrame.col("timestamp"), "1 minutes"),
inputDataFrame.col("deviceId"))
.agg(sum("value"));
异常:
org.apache.spark.sql.types.TimestampType@3eeac696 (of class org.apache.spark.sql.types.TimestampType)
scala.MatchError: org.apache.spark.sql.types.TimestampType@3eeac696 (of class org.apache.spark.sql.types.TimestampType)
at org.apache.spark.sql.catalyst.encoders.RowEncoder$.externalDataTypeFor(RowEncoder.scala:215)
at org.apache.spark.sql.catalyst.encoders.RowEncoder$.externalDataTypeForInput(RowEncoder.scala:212)
at org.apache.spark.sql.catalyst.expressions.objects.ValidateExternalType.<init>(objects.scala:1692)
at org.apache.spark.sql.catalyst.encoders.RowEncoder$.$anonfun$serializerFor(RowEncoder.scala:175)
at scala.collection.TraversableLike.$anonfun$flatMap(TraversableLike.scala:245)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245)
at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242)
at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:198)
at org.apache.spark.sql.catalyst.encoders.RowEncoder$.serializerFor(RowEncoder.scala:171)
at org.apache.spark.sql.catalyst.encoders.RowEncoder$.apply(RowEncoder.scala:66)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows(Dataset.scala:92)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:89)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:232)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:242)
at org.apache.spark.sql.streaming.DataStreamReader.parquet(DataStreamReader.scala:450)
通常,当您的时间戳以 long
形式存储在 milis 中时,您可以将其转换为 timestamp
类型,如下所示:
// Create schema and keep column 'timestamp' as long
StructType readSchema = new StructType()
.add("value", "integer")
.add("deviceId", "long")
.add("timestamp", "long");
// Read data from file
Dataset<Row> inputDataFrame = sparkSession.readStream()
.schema(readSchema)
.parquet(path);
// convert timestamp column into a proper timestamp type
Dataset<Row> df1 = inputDataFrame.withColumn("new_timestamp", expr("timestamp/1000").cast(DataTypes.TimestampType));
df1.show(false)
+-----+--------+-------------+-----------------------+
|value|deviceId|timestamp |new_timestamp |
+-----+--------+-------------+-----------------------+
|1 |1337 |1618836775397|2021-04-19 14:52:55.397|
+-----+--------+-------------+-----------------------+
df1.printSchema();
root
|-- value: integer (nullable = true)
|-- deviceId: long (nullable = true)
|-- timestamp: long (nullable = true)
|-- new_timestamp: timestamp (nullable = true)
我正在从事结构化流媒体工作。
我从文件中读取的数据包含时间戳(以毫秒为单位)、deviceId 和该设备报告的值。 多个设备报告数据。
我正在尝试编写一个作业,将所有设备发送的值汇总(求和)为 1 分钟的 windows。
我遇到的问题是时间戳。
当我尝试将“timestamp”解析为 Long 时,window 函数抱怨它需要“timestamp type”。
当我尝试按照下面的代码片段解析为 TimestampType 时,我得到了 .MatchError
异常(完整的异常可以在下面看到)并且我正在努力弄清楚为什么以及处理它的正确方法是什么
// Create schema
StructType readSchema = new StructType().add("value" , "integer")
.add("deviceId", "long")
.add("timestamp", new TimestampType());
// Read data from file
Dataset<Row> inputDataFrame = sparkSession.readStream()
.schema(readSchema)
.parquet(path);
Dataset<Row> aggregations = inputDataFrame.groupBy(window(inputDataFrame.col("timestamp"), "1 minutes"),
inputDataFrame.col("deviceId"))
.agg(sum("value"));
异常:
org.apache.spark.sql.types.TimestampType@3eeac696 (of class org.apache.spark.sql.types.TimestampType)
scala.MatchError: org.apache.spark.sql.types.TimestampType@3eeac696 (of class org.apache.spark.sql.types.TimestampType)
at org.apache.spark.sql.catalyst.encoders.RowEncoder$.externalDataTypeFor(RowEncoder.scala:215)
at org.apache.spark.sql.catalyst.encoders.RowEncoder$.externalDataTypeForInput(RowEncoder.scala:212)
at org.apache.spark.sql.catalyst.expressions.objects.ValidateExternalType.<init>(objects.scala:1692)
at org.apache.spark.sql.catalyst.encoders.RowEncoder$.$anonfun$serializerFor(RowEncoder.scala:175)
at scala.collection.TraversableLike.$anonfun$flatMap(TraversableLike.scala:245)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245)
at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242)
at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:198)
at org.apache.spark.sql.catalyst.encoders.RowEncoder$.serializerFor(RowEncoder.scala:171)
at org.apache.spark.sql.catalyst.encoders.RowEncoder$.apply(RowEncoder.scala:66)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows(Dataset.scala:92)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:89)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:232)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:242)
at org.apache.spark.sql.streaming.DataStreamReader.parquet(DataStreamReader.scala:450)
通常,当您的时间戳以 long
形式存储在 milis 中时,您可以将其转换为 timestamp
类型,如下所示:
// Create schema and keep column 'timestamp' as long
StructType readSchema = new StructType()
.add("value", "integer")
.add("deviceId", "long")
.add("timestamp", "long");
// Read data from file
Dataset<Row> inputDataFrame = sparkSession.readStream()
.schema(readSchema)
.parquet(path);
// convert timestamp column into a proper timestamp type
Dataset<Row> df1 = inputDataFrame.withColumn("new_timestamp", expr("timestamp/1000").cast(DataTypes.TimestampType));
df1.show(false)
+-----+--------+-------------+-----------------------+
|value|deviceId|timestamp |new_timestamp |
+-----+--------+-------------+-----------------------+
|1 |1337 |1618836775397|2021-04-19 14:52:55.397|
+-----+--------+-------------+-----------------------+
df1.printSchema();
root
|-- value: integer (nullable = true)
|-- deviceId: long (nullable = true)
|-- timestamp: long (nullable = true)
|-- new_timestamp: timestamp (nullable = true)