使用 Kafka 进行 Spark 流式处理:恢复表单检查点时,所有数据仅在一个微批处理中处理
Spark streaming with Kafka: when recovering form checkpointing all data are processed in only one micro batch
我是 运行 一个从 Kafka 读取数据的 Spark Streaming 应用程序。
我已激活检查点以在失败时恢复作业。
问题在于,如果应用程序失败,当它重新启动时,它会尝试仅在一个微批中执行失败点的所有数据。
这意味着如果一个微批通常从 Kafka 接收 10.000 个事件,如果它失败并在 10 分钟后重新启动,它将必须处理一个 100.000 个事件的微批。
现在,如果我希望通过检查点恢复成功,我必须分配比正常情况下更多的内存。
重新启动时,Spark Streaming 尝试立即执行检查点中的所有过去事件是否正常,还是我做错了什么?
非常感谢。
如果你的应用在故障恢复后发现难以在一个微批处理所有事件,你可以提供spark.streaming.kafka.maxRatePerPartition
配置为spark-conf,在spark-defaults.conf或里面你的申请。
即如果您相信您的 system/app 可以安全地处理每 分钟 秒 10K 个事件,并且您的 kafka 主题有 2 个分区,请将此行添加到 spark-defaults.conf
spark.streaming.kafka.maxRatePerPartition 5000
或将其添加到您的代码中:
val conf = new SparkConf()
conf.set("spark.streaming.kafka.maxRatePerPartition", "5000")
此外,我建议您将此数字设置得更高一些并启用背压。这将尝试以不会破坏流式传输应用程序稳定性的速率传输数据。
conf.set("spark.streaming.backpressure.enabled","true")
更新:有一个错误,配置是每秒钟的秒数而不是每分钟。
我是 运行 一个从 Kafka 读取数据的 Spark Streaming 应用程序。 我已激活检查点以在失败时恢复作业。
问题在于,如果应用程序失败,当它重新启动时,它会尝试仅在一个微批中执行失败点的所有数据。 这意味着如果一个微批通常从 Kafka 接收 10.000 个事件,如果它失败并在 10 分钟后重新启动,它将必须处理一个 100.000 个事件的微批。
现在,如果我希望通过检查点恢复成功,我必须分配比正常情况下更多的内存。
重新启动时,Spark Streaming 尝试立即执行检查点中的所有过去事件是否正常,还是我做错了什么?
非常感谢。
如果你的应用在故障恢复后发现难以在一个微批处理所有事件,你可以提供spark.streaming.kafka.maxRatePerPartition
配置为spark-conf,在spark-defaults.conf或里面你的申请。
即如果您相信您的 system/app 可以安全地处理每 分钟 秒 10K 个事件,并且您的 kafka 主题有 2 个分区,请将此行添加到 spark-defaults.conf
spark.streaming.kafka.maxRatePerPartition 5000
或将其添加到您的代码中:
val conf = new SparkConf()
conf.set("spark.streaming.kafka.maxRatePerPartition", "5000")
此外,我建议您将此数字设置得更高一些并启用背压。这将尝试以不会破坏流式传输应用程序稳定性的速率传输数据。
conf.set("spark.streaming.backpressure.enabled","true")
更新:有一个错误,配置是每秒钟的秒数而不是每分钟。