结构化流的检查点周期多久一次,是否可配置?
How often is the checkpoint period for structured streaming, and is it configurable?
我正在为我们的数据转发工作从 Spark 批处理切换到结构化流处理做准备。我们使用一个 Kafka source 和一个由 socket 连接组成的 foreach
sink。
对于批处理流,我尝试通过将偏移量存储在从套接字返回的每个 ACK 上的 zookeeper 中来强制执行一次语义,但是由于我们的偏移量,它很可能每周几次出现生产吞吐量中断管理。我现在已经从经常发帖的人 Jacek Laskowski 那里注意到偏移量管理:
You simply should not be dealing with this low-level "thing" called offsets that Spark Structured Streaming uses to offer exactly once guarantees.
我知道由于套接字不是幂等的,我们不能通过 HDFS 检查点来保证 exactly once 语义。我读过,对于结构化,偏移量将在每个触发时被检查点,但在没有检查点的试验 运行 中,我每 25 毫秒看到一次触发持续时间。
结构化流真的能够每 25 毫秒存储一次偏移量吗?从结构化流的角度来看,这个检查点周期是否可以配置?请记住,我还没有在我们的 spark worker 上安装 HDFS,所以如果它是 HDFS 方面的简单配置,那么我对这个长问题表示歉意:)
您可以配置触发频率如下:
import org.apache.spark.sql.streaming.Trigger
val query = resultTable
.writeStream
.outputMode(OutputMode.Update())
.option("checkpointLocation", "hdfs://path/to/checkpoints")
.trigger(Trigger.ProcessingTime(10.seconds))
.foreach(writer)
.start()
query.awaitTermination()
我正在为我们的数据转发工作从 Spark 批处理切换到结构化流处理做准备。我们使用一个 Kafka source 和一个由 socket 连接组成的 foreach
sink。
对于批处理流,我尝试通过将偏移量存储在从套接字返回的每个 ACK 上的 zookeeper 中来强制执行一次语义,但是由于我们的偏移量,它很可能每周几次出现生产吞吐量中断管理。我现在已经从经常发帖的人 Jacek Laskowski 那里注意到偏移量管理:
You simply should not be dealing with this low-level "thing" called offsets that Spark Structured Streaming uses to offer exactly once guarantees.
我知道由于套接字不是幂等的,我们不能通过 HDFS 检查点来保证 exactly once 语义。我读过,对于结构化,偏移量将在每个触发时被检查点,但在没有检查点的试验 运行 中,我每 25 毫秒看到一次触发持续时间。
结构化流真的能够每 25 毫秒存储一次偏移量吗?从结构化流的角度来看,这个检查点周期是否可以配置?请记住,我还没有在我们的 spark worker 上安装 HDFS,所以如果它是 HDFS 方面的简单配置,那么我对这个长问题表示歉意:)
您可以配置触发频率如下:
import org.apache.spark.sql.streaming.Trigger
val query = resultTable
.writeStream
.outputMode(OutputMode.Update())
.option("checkpointLocation", "hdfs://path/to/checkpoints")
.trigger(Trigger.ProcessingTime(10.seconds))
.foreach(writer)
.start()
query.awaitTermination()