结构化流将kafka时间戳截断为秒
Structured streaming truncates kafka timestamps to seconds
我正在使用 Spark Structured Streaming 从 Kafka 读取数据并希望在消息中包含 Kafka 时间戳:
sparkSession.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka-broker:10000")
.option("subscribe", "topicname")
.option("includeTimestamp", true)
.load()
.selectExpr("CAST(topic AS STRING)", "CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
.as[(String, String, String, Long)]
当我查看时间戳时,它从毫秒截断为秒。有什么办法可以在读取后恢复毫秒精度吗?
我刚刚在 IntelliJ 中使用我的本地 Kafka 设置快速尝试了这个。
如果您将时间戳字段末尾的三个点称为截断(如以下输出所示):
Batch: 1
-------------------------------------------
+-----+----+--------+--------------------+
|topic| key| value| timestamp|
+-----+----+--------+--------------------+
| test|null|test-123|2018-10-07 03:10:...|
| test|null|test-234|2018-10-07 03:10:...|
+-----+----+--------+--------------------+
那么你只需要添加以下行:
.option("truncate", false)
在你的 writeStream()
部分,如:
Dataset<Row> df = sparkSession
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test")
.option("includeTimestamp", "true")
.load()
.selectExpr("CAST(topic AS STRING)", "CAST(key AS STRING)", "CAST(value AS STRING)", "CAST(timestamp as STRING)");
try {
df.writeStream()
.outputMode("append")
.format("console")
.option("truncate", false)
.start()
.awaitTermination();
} catch (StreamingQueryException e) {
e.printStackTrace();
}
此更改为我提供了输出中的完整时间戳:
Batch: 1
-------------------------------------------
+-----+----+--------+-----------------------+
|topic|key |value |timestamp |
+-----+----+--------+-----------------------+
|test |null|test-123|2018-10-07 03:19:50.677|
|test |null|test-234|2018-10-07 03:19:52.673|
+-----+----+--------+-----------------------+
希望对您有所帮助。
当时间戳被读取为 Long 值时发生截断。这发生在最后一行:
sparkSession.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka-broker:10000")
.option("subscribe", "topicname")
.option("includeTimestamp", true)
.load()
.selectExpr("CAST(topic AS STRING)", "CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
.as[(String, String, String, Long)]
当您将最后一行更改为:
时它不会被截断
.as[(String, String, String, Timestamp)]
我正在使用 Spark Structured Streaming 从 Kafka 读取数据并希望在消息中包含 Kafka 时间戳:
sparkSession.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka-broker:10000")
.option("subscribe", "topicname")
.option("includeTimestamp", true)
.load()
.selectExpr("CAST(topic AS STRING)", "CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
.as[(String, String, String, Long)]
当我查看时间戳时,它从毫秒截断为秒。有什么办法可以在读取后恢复毫秒精度吗?
我刚刚在 IntelliJ 中使用我的本地 Kafka 设置快速尝试了这个。
如果您将时间戳字段末尾的三个点称为截断(如以下输出所示):
Batch: 1
-------------------------------------------
+-----+----+--------+--------------------+
|topic| key| value| timestamp|
+-----+----+--------+--------------------+
| test|null|test-123|2018-10-07 03:10:...|
| test|null|test-234|2018-10-07 03:10:...|
+-----+----+--------+--------------------+
那么你只需要添加以下行:
.option("truncate", false)
在你的 writeStream()
部分,如:
Dataset<Row> df = sparkSession
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test")
.option("includeTimestamp", "true")
.load()
.selectExpr("CAST(topic AS STRING)", "CAST(key AS STRING)", "CAST(value AS STRING)", "CAST(timestamp as STRING)");
try {
df.writeStream()
.outputMode("append")
.format("console")
.option("truncate", false)
.start()
.awaitTermination();
} catch (StreamingQueryException e) {
e.printStackTrace();
}
此更改为我提供了输出中的完整时间戳:
Batch: 1
-------------------------------------------
+-----+----+--------+-----------------------+
|topic|key |value |timestamp |
+-----+----+--------+-----------------------+
|test |null|test-123|2018-10-07 03:19:50.677|
|test |null|test-234|2018-10-07 03:19:52.673|
+-----+----+--------+-----------------------+
希望对您有所帮助。
当时间戳被读取为 Long 值时发生截断。这发生在最后一行:
sparkSession.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka-broker:10000")
.option("subscribe", "topicname")
.option("includeTimestamp", true)
.load()
.selectExpr("CAST(topic AS STRING)", "CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
.as[(String, String, String, Long)]
当您将最后一行更改为:
时它不会被截断.as[(String, String, String, Timestamp)]