Apache Spark Streaming,如何处理下游依赖失败
Apache Spark Streaming, How to handle Downstream dependency failures
我正在尝试了解如何使 Spark Streaming 应用程序更具容错性(特别是在尝试写入下游依赖项时),但我不知道处理尝试写入失败的最佳方法是什么结果到外部源,如 Cassandra、DynamoDB 等。
例如,我有一个 Spark Streaming 作业,它从 Stream(Kafka、Flume 等……我还没有最终确定要使用哪种技术)中提取数据,将类似的项目聚合在一起,然后将结果写入外部存储。 (即 Cassandra、DynamoDB 或接收我的 DStream 计算结果的任何东西)。
我想弄明白我是如何处理外部依赖不可写的情况的。也许集群宕机了,也许有权限问题等等,但我的工作无法将结果写入外部依赖。有没有办法暂停 Spark Streaming,以便接收方不会继续批处理数据?我应该只休眠当前批次并让接收器继续存储批次吗?如果问题是暂时的(几秒钟),继续批处理可能是可以接受的,但是如果依赖性下降几分钟或 1 个多小时会怎样?
我曾想过要有一个监视进程在后台监视依赖项的运行状况,如果它发现它是 "unhealthy",它就会停止工作。然后,当所有依赖关系都正常时,我可以开始备份作业并处理所有未写入外部源的数据。
我的另一个想法是以某种方式在 DStream forEachRdd 方法中发出信号,表明存在问题。我可以在 DStream 中抛出某种异常,它会向驱动程序发信号通知它应该停止吗?
如果有人有任何关于如何处理外部容错的经验,或者可以指出articles/videos,那就太好了。
谢谢
现在我用的是direct stream,自己保存offsets。这可能无法解决您的问题,至少一旦您发现外部存储出现问题,您可以从停止的地方重新开始。
我相信这里没有简单通用的答案。很大程度上取决于应用程序语义、数据源类型(可靠接收器、可靠接收器、基于文件、无接收器)和要求。
一般来说,您永远不应该让应用程序故障转移单个 IO 故障。假设你有一些行动:
outputAction[T](rdd: RDD[T]): Unit = ???
至少要确保它不会将异常传播给您的驱动程序。
outputActionWithDelay[T](d: Duration)(rdd: RDD[T]) = ???
stream foreachRDD { rdd => Try(outputAction(rdd)) }
问题仍然是下一步。您可以做的最简单的事情就是放弃 given window。根据应用程序,它可能是可接受的解决方案,但通常在许多情况下丢失一些数据是完全可以接受的。
可以通过跟踪故障并在达到某个阈值时采取其他措施来进一步改进。
如果丢弃数据不可接受,下一步是在延迟一段时间后重试:
outputActionWithDelay[T](d: Duration)(rdd: RDD[T]) = ???
stream foreachRDD {
rdd => Try(outputAction(rdd))
.recoverWith { case _ => Try(outputActionWithDelay(d1)(rdd)) }
.recoverWith { case _ => Try(outputActionWithDelay(d2)(rdd)) }
...
}
重试次数和延迟持续时间因情况而异,取决于来源和存储传入数据的能力。
当我们点击最后一次重试时,你能做什么?对于初学者,我们可以添加一个替代输出源。例如,您可以将所有内容推送到可靠的外部文件存储,而不是使用主要来源,以后再担心。如果输出源需要输入数据的特定顺序,这可能不适用,否则应该值得尝试。
alternativeOutputAction[T](rdd: RDD[T]) = ???
stream foreachRDD {
rdd => Try(outputAction(rdd))
.recoverWith { case _ => Try(outputActionWithDelay(d1)
...
.recoverWith { case _ => Try(outputActionWithDelay(dn)(rdd)) }
.recoverWith { case _ => Try(alternativeOutputAction(rdd))
}
如果失败,则可能是严重问题的征兆,我们在应用程序级别无能为力。我们可以回到第一种方法,只是希望情况能尽快解决或选择更复杂的方法。
如果输入源可以缓冲数据并且我们使用可靠的存储和复制,那么我们可以enable checkpointing并简单地终止应用程序。
如果您尝试恢复,添加 CircuitBreaker 的一些变体可能是个好主意,并且如果应用程序遇到多次失败,试图立即进行主输出丢弃恢复尝试。
我正在尝试了解如何使 Spark Streaming 应用程序更具容错性(特别是在尝试写入下游依赖项时),但我不知道处理尝试写入失败的最佳方法是什么结果到外部源,如 Cassandra、DynamoDB 等。
例如,我有一个 Spark Streaming 作业,它从 Stream(Kafka、Flume 等……我还没有最终确定要使用哪种技术)中提取数据,将类似的项目聚合在一起,然后将结果写入外部存储。 (即 Cassandra、DynamoDB 或接收我的 DStream 计算结果的任何东西)。
我想弄明白我是如何处理外部依赖不可写的情况的。也许集群宕机了,也许有权限问题等等,但我的工作无法将结果写入外部依赖。有没有办法暂停 Spark Streaming,以便接收方不会继续批处理数据?我应该只休眠当前批次并让接收器继续存储批次吗?如果问题是暂时的(几秒钟),继续批处理可能是可以接受的,但是如果依赖性下降几分钟或 1 个多小时会怎样?
我曾想过要有一个监视进程在后台监视依赖项的运行状况,如果它发现它是 "unhealthy",它就会停止工作。然后,当所有依赖关系都正常时,我可以开始备份作业并处理所有未写入外部源的数据。
我的另一个想法是以某种方式在 DStream forEachRdd 方法中发出信号,表明存在问题。我可以在 DStream 中抛出某种异常,它会向驱动程序发信号通知它应该停止吗?
如果有人有任何关于如何处理外部容错的经验,或者可以指出articles/videos,那就太好了。
谢谢
现在我用的是direct stream,自己保存offsets。这可能无法解决您的问题,至少一旦您发现外部存储出现问题,您可以从停止的地方重新开始。
我相信这里没有简单通用的答案。很大程度上取决于应用程序语义、数据源类型(可靠接收器、可靠接收器、基于文件、无接收器)和要求。
一般来说,您永远不应该让应用程序故障转移单个 IO 故障。假设你有一些行动:
outputAction[T](rdd: RDD[T]): Unit = ???
至少要确保它不会将异常传播给您的驱动程序。
outputActionWithDelay[T](d: Duration)(rdd: RDD[T]) = ???
stream foreachRDD { rdd => Try(outputAction(rdd)) }
问题仍然是下一步。您可以做的最简单的事情就是放弃 given window。根据应用程序,它可能是可接受的解决方案,但通常在许多情况下丢失一些数据是完全可以接受的。
可以通过跟踪故障并在达到某个阈值时采取其他措施来进一步改进。
如果丢弃数据不可接受,下一步是在延迟一段时间后重试:
outputActionWithDelay[T](d: Duration)(rdd: RDD[T]) = ???
stream foreachRDD {
rdd => Try(outputAction(rdd))
.recoverWith { case _ => Try(outputActionWithDelay(d1)(rdd)) }
.recoverWith { case _ => Try(outputActionWithDelay(d2)(rdd)) }
...
}
重试次数和延迟持续时间因情况而异,取决于来源和存储传入数据的能力。
当我们点击最后一次重试时,你能做什么?对于初学者,我们可以添加一个替代输出源。例如,您可以将所有内容推送到可靠的外部文件存储,而不是使用主要来源,以后再担心。如果输出源需要输入数据的特定顺序,这可能不适用,否则应该值得尝试。
alternativeOutputAction[T](rdd: RDD[T]) = ???
stream foreachRDD {
rdd => Try(outputAction(rdd))
.recoverWith { case _ => Try(outputActionWithDelay(d1)
...
.recoverWith { case _ => Try(outputActionWithDelay(dn)(rdd)) }
.recoverWith { case _ => Try(alternativeOutputAction(rdd))
}
如果失败,则可能是严重问题的征兆,我们在应用程序级别无能为力。我们可以回到第一种方法,只是希望情况能尽快解决或选择更复杂的方法。
如果输入源可以缓冲数据并且我们使用可靠的存储和复制,那么我们可以enable checkpointing并简单地终止应用程序。
如果您尝试恢复,添加 CircuitBreaker 的一些变体可能是个好主意,并且如果应用程序遇到多次失败,试图立即进行主输出丢弃恢复尝试。