如何使用 Python 查看 Spark Structured Streaming 中的特定指标
How to see a particular metric in Spark Structured Streaming with Python
我对 Spark 和 Python 还很陌生。我正在尝试查看 Spark Structured Streaming 中的任何指标(例如,processedRowsPerSecond
),但我不知道该怎么做。
我在“Structured Streaming Programming Guide”中读到,使用 print(query.lastProgress)
您可以直接获取活动查询的当前状态和指标,但是如果我写它我只获得None
一次。我的代码的最后一部分如下:
query = windowedCountsDF\
.writeStream\
.outputMode('update')\
.option("truncate", "false") \
.format('console') \
.queryName("numbers") \
.start()
print(query.lastProgress)
query.awaitTermination()
任何关于如何做到这一点的想法将不胜感激。
这实际上取决于您想对该指标做什么。您的问题是您正在调用 query.awaitTermination()
,它会阻止任何其他 activity。如果你想收集指标,那么你需要实现自己的等待循环而不是调用 query.awaitTermination()
,就像这样:
query = ...
while not query.exception():
if query.lastProgress:
print(query.lastProgress) # do something with your data
time.sleep(10) # wait 10 seconds..
试试:
while query.isActive:
print("\n")
print(query.status)
print(query.lastProgress)
time.sleep(30)
query.awaitTermination()
块 query.lastProgress
.
我对 Spark 和 Python 还很陌生。我正在尝试查看 Spark Structured Streaming 中的任何指标(例如,processedRowsPerSecond
),但我不知道该怎么做。
我在“Structured Streaming Programming Guide”中读到,使用 print(query.lastProgress)
您可以直接获取活动查询的当前状态和指标,但是如果我写它我只获得None
一次。我的代码的最后一部分如下:
query = windowedCountsDF\
.writeStream\
.outputMode('update')\
.option("truncate", "false") \
.format('console') \
.queryName("numbers") \
.start()
print(query.lastProgress)
query.awaitTermination()
任何关于如何做到这一点的想法将不胜感激。
这实际上取决于您想对该指标做什么。您的问题是您正在调用 query.awaitTermination()
,它会阻止任何其他 activity。如果你想收集指标,那么你需要实现自己的等待循环而不是调用 query.awaitTermination()
,就像这样:
query = ...
while not query.exception():
if query.lastProgress:
print(query.lastProgress) # do something with your data
time.sleep(10) # wait 10 seconds..
试试:
while query.isActive:
print("\n")
print(query.status)
print(query.lastProgress)
time.sleep(30)
query.awaitTermination()
块 query.lastProgress
.