如何在 Spark Structured Streaming 中指定批处理间隔?
How to specify batch interval in Spark Structured Streaming?
我正在使用 Spark Structured Streaming 时遇到了问题。
在StreamingContext、DStreams中,我们可以定义一个batch interval如下:
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 5) # 5 second batch interval
如何在结构化流中执行此操作?
我的流媒体是这样的:
sparkStreaming = SparkSession \
.builder \
.appName("StreamExample1") \
.getOrCreate()
stream_df = sparkStreaming.readStream.schema("col0 STRING, col1 INTEGER").option("maxFilesPerTrigger", 1).\
csv("C:/sparkStream")
sql1 = stream_df.groupBy("col0").sum("col1")
query = sql1.writeStream.queryName("stream1").outputMode("complete").format("memory").start()
此代码按预期工作,但是如何to/where 在这里定义批处理间隔?
我是结构化流媒体的新手,请指导我。
tl;dr 使用 trigger(...)
(在 DataStreamWriter
上,即在 writeStream
之后)
这是一个很好的来源https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html。
有多种选择,如果不设置批间隔,Spark会在处理完最后一批后立即查找数据。触发器是这里的去向。
来自手册:
The trigger settings of a streaming query defines the timing of
streaming data processing, whether the query is going to executed as
micro-batch query with a fixed batch interval or as a continuous
processing query.
一些示例:
默认触发器(尽快运行微批处理)
df.writeStream \
.format("console") \
.start()
具有两秒微批处理间隔的 ProcessingTime 触发器
df.writeStream \
.format("console") \
.trigger(processingTime='2 seconds') \
.start()
一次性触发
df.writeStream \
.format("console") \
.trigger(once=True) \
.start()
具有一秒检查点间隔的连续触发器
df.writeStream
.format("console")
.trigger(continuous='1 second')
.start()
我正在使用 Spark Structured Streaming 时遇到了问题。
在StreamingContext、DStreams中,我们可以定义一个batch interval如下:
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 5) # 5 second batch interval
如何在结构化流中执行此操作?
我的流媒体是这样的:
sparkStreaming = SparkSession \
.builder \
.appName("StreamExample1") \
.getOrCreate()
stream_df = sparkStreaming.readStream.schema("col0 STRING, col1 INTEGER").option("maxFilesPerTrigger", 1).\
csv("C:/sparkStream")
sql1 = stream_df.groupBy("col0").sum("col1")
query = sql1.writeStream.queryName("stream1").outputMode("complete").format("memory").start()
此代码按预期工作,但是如何to/where 在这里定义批处理间隔?
我是结构化流媒体的新手,请指导我。
tl;dr 使用 trigger(...)
(在 DataStreamWriter
上,即在 writeStream
之后)
这是一个很好的来源https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html。
有多种选择,如果不设置批间隔,Spark会在处理完最后一批后立即查找数据。触发器是这里的去向。
来自手册:
The trigger settings of a streaming query defines the timing of streaming data processing, whether the query is going to executed as micro-batch query with a fixed batch interval or as a continuous processing query.
一些示例:
默认触发器(尽快运行微批处理)
df.writeStream \
.format("console") \
.start()
具有两秒微批处理间隔的 ProcessingTime 触发器
df.writeStream \
.format("console") \
.trigger(processingTime='2 seconds') \
.start()
一次性触发
df.writeStream \
.format("console") \
.trigger(once=True) \
.start()
具有一秒检查点间隔的连续触发器
df.writeStream
.format("console")
.trigger(continuous='1 second')
.start()