使用 PySpark 检测 StreamingContext 是否空闲

Detect if a StreamingContext is idle with PySpark

StreamingContext 函数提供了 awaitTermination(timeout=None)awaitTerminationOrTimeout(timeout) 方法,它们都需要外部进程来停止上下文。

是否有任何机制可以应用于允许在空闲一段时间后自行终止的 StreamingContext 作业?我的意思是该超时没有来自流媒体源的数据。

您可以跟踪状态并在满足特定条件时执行 ssc.stop() 但这并不是一个很好的解决方案:

from pyspark.streaming import StreamingContext

def counter(ssc, n=10):
    cnt = {"cnt": 0}
    def _counter(rdd):
        if rdd.isEmpty():
            cnt["cnt"] += 1
        else:
            cnt["cnt"] = 0
        if cnt["cnt"] >= n:
            ssc.stop()
    return _counter

ssc = StreamingContext(sc, 1)
cnt = counter(ssc, 5)

stream = ...  # Some DStream
stream.foreachRDD(cnt)

ssc.start()
ssc.awaitTermination()