Spark structured streaming - UNION 两个或多个流媒体源
Spark structured streaming - UNION two or more streaming sources
我正在使用 spark 2.3.2 和 运行 解决一个问题,即对来自 Kafka 的 2 个或更多流媒体源进行联合。其中每一个都是来自 Kafka 的流媒体源,我已经转换并存储在 Dataframes 中。
理想情况下,我希望将此 UNIONed 数据帧的结果以镶木地板格式存储在 HDFS 中,甚至可能存储回 Kafka。最终目标是以尽可能低的延迟存储这些合并事件。
val finalDF = flatDF1
.union(flatDF2)
.union(flatDF3)
val query = finalDF.writeStream
.format("parquet")
.outputMode("append")
.option("path", hdfsLocation)
.option("checkpointLocation", checkpointLocation)
.option("failOnDataLoss", false)
.start()
query.awaitTermination()
当对控制台而不是镶木地板执行 writeStream 时,我得到了预期的结果,但上面的示例导致断言失败。
Caused by: java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:156)
at org.apache.spark.sql.execution.streaming.OffsetSeq.toStreamProgress(OffsetSeq.scala:42)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets(MicroBatchExecution.scala:185)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$$anonfun$apply$mcZ$sp.apply$mcV$sp(MicroBatchExecution.scala:124)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$$anonfun$apply$mcZ$sp.apply(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$$anonfun$apply$mcZ$sp.apply(MicroBatchExecution.scala:121)
这里是 class 和失败的断言:
case class OffsetSeq(offsets: Seq[Option[Offset]], metadata: Option[OffsetSeqMetadata] = None) {
assert(sources.size == offsets.size)
这是因为检查点只存储其中一个数据帧的偏移量吗?查看 Spark Structured Streaming 文档,看起来可以在 Spark 2.2 或 >
中执行 joins/union of streaming sources
首先,请定义您的案例 class OffsetSeq 与数据帧并集的代码的关系。
接下来,当执行这个联合然后使用 writestream 写入 Kafka 时,检查点是一个真正的问题。分成多个写入流 - 每个写入流都有自己的检查点 - 由于联合操作而混淆了批次 ID。使用具有数据帧联合的相同 writestream 会因检查点失败而失败,因为检查点似乎会寻找在联合之前生成数据帧的所有模型,并且无法区分 row/record 来自 dataframe/model.
为了从结构化 sql 流式联合数据帧写入 Kafka - 最好将 writestream 与 foreach 和 ForEachWriter 一起使用,包括进程方法中的 Kafka Producer。不需要检查点;应用程序仅使用临时检查点文件,这些文件设置为在适当时删除 - 在会话构建器中将“forceDeleteTempCheckpointLocation”设置为 true。
无论如何,我刚刚设置了 scala 代码来合并任意数量的流数据帧,然后写入 Kafka Producer。一旦将所有 Kafka Producer 代码放入 ForEachWriter process 方法以便它可以被 Spark 序列化,似乎就可以正常工作。
val output = dataFrameModelArray.reduce(_ union _)
val stream: StreamingQuery = output
.writeStream.foreach(new ForeachWriter[Row] {
def open(partitionId: Long, version: Long): Boolean = {
true
}
def process(row: Row): Unit = {
val producer: KafkaProducer[String, String] = new KafkaProducer[String, String](props)
val record = new ProducerRecord[String, String](producerTopic, row.getString(0), row.getString(1))
producer.send(record)
}
def close(errorOrNull: Throwable): Unit = {
}
}
).start()
如果需要,可以在 process 方法中添加更多逻辑。
注意在合并之前,所有要合并的数据帧都已转换为键、值字符串列。值是要通过 Kafka Producer 发送的消息数据的 json 字符串。这对于在尝试并集之前进行写入也非常重要。
svcModel.transform(query)
.select($"key", $"uuid", $"currentTime", $"label", $"rawPrediction", $"prediction")
.selectExpr("key", "to_json(struct(*)) AS value")
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
其中 svcModel 是 dataFrameModelArray 中的数据框。
我正在使用 spark 2.3.2 和 运行 解决一个问题,即对来自 Kafka 的 2 个或更多流媒体源进行联合。其中每一个都是来自 Kafka 的流媒体源,我已经转换并存储在 Dataframes 中。
理想情况下,我希望将此 UNIONed 数据帧的结果以镶木地板格式存储在 HDFS 中,甚至可能存储回 Kafka。最终目标是以尽可能低的延迟存储这些合并事件。
val finalDF = flatDF1
.union(flatDF2)
.union(flatDF3)
val query = finalDF.writeStream
.format("parquet")
.outputMode("append")
.option("path", hdfsLocation)
.option("checkpointLocation", checkpointLocation)
.option("failOnDataLoss", false)
.start()
query.awaitTermination()
当对控制台而不是镶木地板执行 writeStream 时,我得到了预期的结果,但上面的示例导致断言失败。
Caused by: java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:156)
at org.apache.spark.sql.execution.streaming.OffsetSeq.toStreamProgress(OffsetSeq.scala:42)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets(MicroBatchExecution.scala:185)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$$anonfun$apply$mcZ$sp.apply$mcV$sp(MicroBatchExecution.scala:124)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$$anonfun$apply$mcZ$sp.apply(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$$anonfun$apply$mcZ$sp.apply(MicroBatchExecution.scala:121)
这里是 class 和失败的断言:
case class OffsetSeq(offsets: Seq[Option[Offset]], metadata: Option[OffsetSeqMetadata] = None) {
assert(sources.size == offsets.size)
这是因为检查点只存储其中一个数据帧的偏移量吗?查看 Spark Structured Streaming 文档,看起来可以在 Spark 2.2 或 >
中执行 joins/union of streaming sources首先,请定义您的案例 class OffsetSeq 与数据帧并集的代码的关系。
接下来,当执行这个联合然后使用 writestream 写入 Kafka 时,检查点是一个真正的问题。分成多个写入流 - 每个写入流都有自己的检查点 - 由于联合操作而混淆了批次 ID。使用具有数据帧联合的相同 writestream 会因检查点失败而失败,因为检查点似乎会寻找在联合之前生成数据帧的所有模型,并且无法区分 row/record 来自 dataframe/model.
为了从结构化 sql 流式联合数据帧写入 Kafka - 最好将 writestream 与 foreach 和 ForEachWriter 一起使用,包括进程方法中的 Kafka Producer。不需要检查点;应用程序仅使用临时检查点文件,这些文件设置为在适当时删除 - 在会话构建器中将“forceDeleteTempCheckpointLocation”设置为 true。
无论如何,我刚刚设置了 scala 代码来合并任意数量的流数据帧,然后写入 Kafka Producer。一旦将所有 Kafka Producer 代码放入 ForEachWriter process 方法以便它可以被 Spark 序列化,似乎就可以正常工作。
val output = dataFrameModelArray.reduce(_ union _)
val stream: StreamingQuery = output
.writeStream.foreach(new ForeachWriter[Row] {
def open(partitionId: Long, version: Long): Boolean = {
true
}
def process(row: Row): Unit = {
val producer: KafkaProducer[String, String] = new KafkaProducer[String, String](props)
val record = new ProducerRecord[String, String](producerTopic, row.getString(0), row.getString(1))
producer.send(record)
}
def close(errorOrNull: Throwable): Unit = {
}
}
).start()
如果需要,可以在 process 方法中添加更多逻辑。
注意在合并之前,所有要合并的数据帧都已转换为键、值字符串列。值是要通过 Kafka Producer 发送的消息数据的 json 字符串。这对于在尝试并集之前进行写入也非常重要。
svcModel.transform(query)
.select($"key", $"uuid", $"currentTime", $"label", $"rawPrediction", $"prediction")
.selectExpr("key", "to_json(struct(*)) AS value")
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
其中 svcModel 是 dataFrameModelArray 中的数据框。