Spark 中未捕获的异常处理
Uncaught Exception Handling in Spark
我正在开发一个基于 Java 的 Spark Streaming 应用程序,它响应来自 Kafka 主题的消息。对于每条消息,应用程序都会进行一些处理,并将结果写回不同的 Kafka 主题。
有时由于意外的数据相关问题,操作 RDD 的代码可能会失败并抛出异常。当发生这种情况时,我希望有一个通用处理程序可以采取必要的操作并将消息发送到错误主题。现在,这些异常是由Spark自己写在Spark的日志中的。
最好的方法是什么,而不是为在 RDD 上工作的每个代码块编写 try-catch 块?
您可以编写一个通用函数来执行此操作。您只需要将它包装在 RDD 操作周围,因为它们是唯一可以抛出 Spark 异常的操作(像 .map
和 .filter
这样的转换器是由操作延迟执行的)。
(假设这是在 Scala 中)您甚至可以尝试使用隐含函数。创建一个包含 RDD 并处理错误的 class。这里有一个可能看起来像的草图:
implicit class FailSafeRDD[T](rdd: RDD[T]) {
def failsafeAction[U](fn: RDD[T] => U): Try[U] = Try {
fn(rdd)
}
}
您可以将错误主题消息添加到 failsafeAction
或您每次失败时想做的任何事情。然后用法可能是这样的:
val rdd = ??? // Some rdd you already have
val resultOrException = rdd.failsafeAction { r => r.count() }
除此之外,我认为 "best" 方法对应用程序需求有些主观。
我想你也可以用 try catch 实现这个 =>
dstream.foreachRDD { case rdd: RDD[String] =>
rdd.foreach { case string: String =>
try {
val kafkaProducer = ...
val msg = ...
kafkaProducer.send(msg)
} catch {
case d: DataException=>
val kafkaErrorProducer = ...
val errorMsg = ...
kafkaErrorProducer.send(errorMsg )
case t: Throwable =>
//further error handling
}
}
}
我正在开发一个基于 Java 的 Spark Streaming 应用程序,它响应来自 Kafka 主题的消息。对于每条消息,应用程序都会进行一些处理,并将结果写回不同的 Kafka 主题。
有时由于意外的数据相关问题,操作 RDD 的代码可能会失败并抛出异常。当发生这种情况时,我希望有一个通用处理程序可以采取必要的操作并将消息发送到错误主题。现在,这些异常是由Spark自己写在Spark的日志中的。
最好的方法是什么,而不是为在 RDD 上工作的每个代码块编写 try-catch 块?
您可以编写一个通用函数来执行此操作。您只需要将它包装在 RDD 操作周围,因为它们是唯一可以抛出 Spark 异常的操作(像 .map
和 .filter
这样的转换器是由操作延迟执行的)。
(假设这是在 Scala 中)您甚至可以尝试使用隐含函数。创建一个包含 RDD 并处理错误的 class。这里有一个可能看起来像的草图:
implicit class FailSafeRDD[T](rdd: RDD[T]) {
def failsafeAction[U](fn: RDD[T] => U): Try[U] = Try {
fn(rdd)
}
}
您可以将错误主题消息添加到 failsafeAction
或您每次失败时想做的任何事情。然后用法可能是这样的:
val rdd = ??? // Some rdd you already have
val resultOrException = rdd.failsafeAction { r => r.count() }
除此之外,我认为 "best" 方法对应用程序需求有些主观。
我想你也可以用 try catch 实现这个 =>
dstream.foreachRDD { case rdd: RDD[String] =>
rdd.foreach { case string: String =>
try {
val kafkaProducer = ...
val msg = ...
kafkaProducer.send(msg)
} catch {
case d: DataException=>
val kafkaErrorProducer = ...
val errorMsg = ...
kafkaErrorProducer.send(errorMsg )
case t: Throwable =>
//further error handling
}
}
}