Spark 结构化流异常处理
Spark Structured Streaming exception handling
我使用 Spark Structured Streaming 从 MQTT 流源读取数据 API。
val lines:= spark.readStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("topic", "Employee")
.option("username", "username")
.option("password", "passwork")
.option("clientId", "employee11")
.load("tcp://localhost:8000").as[(String, Timestamp)]
我将流数据转换为案例class员工
case class Employee(Name: String, Department: String)
val ds = lines.map {
row =>
implicit val format = DefaultFormats
parse(row._1).extract[Employee]
}
....some transformations
df.writeStream
.outputMode("append")
.format("es")
.option("es.resource", "spark/employee")
.option("es.nodes", "localhost")
.option("es.port", 9200)
.start()
.awaitTermination()
现在队列中有一些消息的结构不同于 Employee
案例 class。假设缺少一些必需的列。我的流式处理作业因未找到字段异常而失败。
现在我想处理这样的异常,也想为此发送警报通知。我试着放一个 try/catch 块。
case class ErrorMessage(row: String)
catch {
case e: Exception =>
val ds = lines.map {
row =>
implicit val format = DefaultFormats
parse(row._1).extract[ErrorMessage]
}
val error = lines.foreach(row => {
sendErrorMail(row._1)
})
}
}
得到异常 Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
mqtt
如有任何帮助,我们将不胜感激。
我在 catch 块中创建了一个 foreach 接收器,并且能够处理异常并发送邮件警报。
catch {
case e: Exception =>
val foreachWriter = new ForeachWriter[Row] {
override def open(partitionId: Timestamp, version: Timestamp): Boolean = {
true
}
override def process(value: Row): Unit = {
code for sending mail.........
}
override def close(errorOrNull: Throwable): Unit = {}
}
val df = lines.selectExpr("cast (value as string) as json")
df.writeStream
.foreach(foreachWriter)
.outputMode("append")
.start()
.awaitTermination()
}
我认为您应该使用 start()
方法的 return 对象,如 Spark streaming doc 中所述。类似于:
val query = df.writeStream. ... .start()
try {
//If the query has terminated with an exception, then the exception will be thrown.
query.awaitTermination()
catch {
case ex: Exception => /*code to send mail*/
}
实现自己的 foreach sink 会导致频繁打开和关闭连接的开销。
我使用 Spark Structured Streaming 从 MQTT 流源读取数据 API。
val lines:= spark.readStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("topic", "Employee")
.option("username", "username")
.option("password", "passwork")
.option("clientId", "employee11")
.load("tcp://localhost:8000").as[(String, Timestamp)]
我将流数据转换为案例class员工
case class Employee(Name: String, Department: String)
val ds = lines.map {
row =>
implicit val format = DefaultFormats
parse(row._1).extract[Employee]
}
....some transformations
df.writeStream
.outputMode("append")
.format("es")
.option("es.resource", "spark/employee")
.option("es.nodes", "localhost")
.option("es.port", 9200)
.start()
.awaitTermination()
现在队列中有一些消息的结构不同于 Employee
案例 class。假设缺少一些必需的列。我的流式处理作业因未找到字段异常而失败。
现在我想处理这样的异常,也想为此发送警报通知。我试着放一个 try/catch 块。
case class ErrorMessage(row: String)
catch {
case e: Exception =>
val ds = lines.map {
row =>
implicit val format = DefaultFormats
parse(row._1).extract[ErrorMessage]
}
val error = lines.foreach(row => {
sendErrorMail(row._1)
})
}
}
得到异常 Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
mqtt
如有任何帮助,我们将不胜感激。
我在 catch 块中创建了一个 foreach 接收器,并且能够处理异常并发送邮件警报。
catch {
case e: Exception =>
val foreachWriter = new ForeachWriter[Row] {
override def open(partitionId: Timestamp, version: Timestamp): Boolean = {
true
}
override def process(value: Row): Unit = {
code for sending mail.........
}
override def close(errorOrNull: Throwable): Unit = {}
}
val df = lines.selectExpr("cast (value as string) as json")
df.writeStream
.foreach(foreachWriter)
.outputMode("append")
.start()
.awaitTermination()
}
我认为您应该使用 start()
方法的 return 对象,如 Spark streaming doc 中所述。类似于:
val query = df.writeStream. ... .start()
try {
//If the query has terminated with an exception, then the exception will be thrown.
query.awaitTermination()
catch {
case ex: Exception => /*code to send mail*/
}
实现自己的 foreach sink 会导致频繁打开和关闭连接的开销。