使用 PySpark 检测 StreamingContext 是否空闲
Detect if a StreamingContext is idle with PySpark
StreamingContext 函数提供了 awaitTermination(timeout=None)
和 awaitTerminationOrTimeout(timeout)
方法,它们都需要外部进程来停止上下文。
是否有任何机制可以应用于允许在空闲一段时间后自行终止的 StreamingContext 作业?我的意思是该超时没有来自流媒体源的数据。
您可以跟踪状态并在满足特定条件时执行 ssc.stop()
但这并不是一个很好的解决方案:
from pyspark.streaming import StreamingContext
def counter(ssc, n=10):
cnt = {"cnt": 0}
def _counter(rdd):
if rdd.isEmpty():
cnt["cnt"] += 1
else:
cnt["cnt"] = 0
if cnt["cnt"] >= n:
ssc.stop()
return _counter
ssc = StreamingContext(sc, 1)
cnt = counter(ssc, 5)
stream = ... # Some DStream
stream.foreachRDD(cnt)
ssc.start()
ssc.awaitTermination()
StreamingContext 函数提供了 awaitTermination(timeout=None)
和 awaitTerminationOrTimeout(timeout)
方法,它们都需要外部进程来停止上下文。
是否有任何机制可以应用于允许在空闲一段时间后自行终止的 StreamingContext 作业?我的意思是该超时没有来自流媒体源的数据。
您可以跟踪状态并在满足特定条件时执行 ssc.stop()
但这并不是一个很好的解决方案:
from pyspark.streaming import StreamingContext
def counter(ssc, n=10):
cnt = {"cnt": 0}
def _counter(rdd):
if rdd.isEmpty():
cnt["cnt"] += 1
else:
cnt["cnt"] = 0
if cnt["cnt"] >= n:
ssc.stop()
return _counter
ssc = StreamingContext(sc, 1)
cnt = counter(ssc, 5)
stream = ... # Some DStream
stream.foreachRDD(cnt)
ssc.start()
ssc.awaitTermination()