重启 Spark Structured Streaming Job 消耗数百万条 Kafka 消息并死掉
Restarting Spark Structured Streaming Job consumes Millions of Kafka messages and dies
我们有一个 Spark 流应用程序 运行ning on Spark 2.3.3
基本上,它打开了一个 Kafka Stream:
kafka_stream = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "mykafka:9092") \
.option("subscribe", "mytopic") \
.load()
kafka 主题有 2 个分区。之后,有一些基本的过滤操作,一些 Python UDF 和一个列上的 explode() ,例如:
stream = apply_operations(kafka_stream)
其中 apply_operations 对数据进行所有处理。最后,我们想将流写入接收器,i。即:
stream.writeStream \
.format("our.java.sink.Class") \
.option("some-option", "value") \
.trigger(processingTime='15 seconds') \
.start()
为了让这个流操作永远运行,我们申请:
spark.streams.awaitAnyTermination()
最后。
到目前为止,还不错。一切 运行 都持续了好几天。但是由于网络问题,作业挂了几天,现在kafka流中有百万条消息等待捕获向上。
当我们使用 spark-submit 重新启动流数据作业时,第一个批次将太大并且需要很长时间才能完成。我们认为可能有一种方法可以通过一些参数来限制第一批的大小,但我们没有找到任何有用的方法。
我们尝试过:
spark.streaming.backpressure.enabled=true 以及 spark.streaming.backpressure.initialRate=2000 和 spark.streaming.kafka.maxRatePerPartition=1000 和 spark.streaming.receiver.maxrate=2000
将 spark.streaming.backpressure.pid.minrate 设置为较低的值也没有效果
设置选项("maxOffsetsPerTrigger", 10000)也没有效果
现在,我们重新启动管道后,迟早整个Spark Job会再次崩溃。我们不能简单地扩大用于 spark 作业的内存或内核。
我们是否遗漏了什么来控制在一个流批处理中处理的事件数量?
您在评论中写道您正在使用 spark-streaming-kafka-0-8_2.11 and that api version is not able to handle maxOffsetPerTrigger (or any other mechanism to reduce the number of consumed messages as far as I know) as it was only implemented for the newer api spark-streaming-kafka-0-10_2.11. This newer api also works with your kafka version 0.10.2.2 according to the documentation。
我们有一个 Spark 流应用程序 运行ning on Spark 2.3.3
基本上,它打开了一个 Kafka Stream:
kafka_stream = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "mykafka:9092") \
.option("subscribe", "mytopic") \
.load()
kafka 主题有 2 个分区。之后,有一些基本的过滤操作,一些 Python UDF 和一个列上的 explode() ,例如:
stream = apply_operations(kafka_stream)
其中 apply_operations 对数据进行所有处理。最后,我们想将流写入接收器,i。即:
stream.writeStream \
.format("our.java.sink.Class") \
.option("some-option", "value") \
.trigger(processingTime='15 seconds') \
.start()
为了让这个流操作永远运行,我们申请:
spark.streams.awaitAnyTermination()
最后。
到目前为止,还不错。一切 运行 都持续了好几天。但是由于网络问题,作业挂了几天,现在kafka流中有百万条消息等待捕获向上。
当我们使用 spark-submit 重新启动流数据作业时,第一个批次将太大并且需要很长时间才能完成。我们认为可能有一种方法可以通过一些参数来限制第一批的大小,但我们没有找到任何有用的方法。
我们尝试过:
spark.streaming.backpressure.enabled=true 以及 spark.streaming.backpressure.initialRate=2000 和 spark.streaming.kafka.maxRatePerPartition=1000 和 spark.streaming.receiver.maxrate=2000
将 spark.streaming.backpressure.pid.minrate 设置为较低的值也没有效果
设置选项("maxOffsetsPerTrigger", 10000)也没有效果
现在,我们重新启动管道后,迟早整个Spark Job会再次崩溃。我们不能简单地扩大用于 spark 作业的内存或内核。
我们是否遗漏了什么来控制在一个流批处理中处理的事件数量?
您在评论中写道您正在使用 spark-streaming-kafka-0-8_2.11 and that api version is not able to handle maxOffsetPerTrigger (or any other mechanism to reduce the number of consumed messages as far as I know) as it was only implemented for the newer api spark-streaming-kafka-0-10_2.11. This newer api also works with your kafka version 0.10.2.2 according to the documentation。