控制 Structured Spark Streaming 的微批处理
Control micro batch of Structured Spark Streaming
我正在从 Kafka 主题读取数据,并以分区模式将其放入 Azure ADLS(类似 HDFS)。
我的代码如下:
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", topic)
.option("failOnDataLoss", false)
.load()
.selectExpr(/*"CAST(key AS STRING)",*/ "CAST(value AS STRING)").as(Encoders.STRING)
df.writeStream
.partitionBy("year", "month", "day", "hour", "minute")
.format("parquet")
.option("path", outputDirectory)
.option("checkpointLocation", checkpointDirectory)
.outputMode("append")
.start()
.awaitTermination()
我有大约 2000 records/sec,我的问题是 Spark 每 45 秒插入一次数据,我希望立即插入数据。
谁知道如何控制微批的大小?
从Spark 2.3版本开始,可以使用连续处理模式。在官方文档中。您可以看到此模式仅支持三个接收器,并且只有 Kafka 接收器已准备好用于生产,并且“end-to-endlow-latency 处理可以最好地观察 Kafka 作为源和下沉"
df
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("checkpointLocation", "/tmp/0")
.option("topic", "output0")
.trigger(Trigger.Continuous("0 seconds"))
.start()
因此,目前看来,您不能使用连续模式将 HDFS 用作接收器。在您的情况下,也许您可以测试 Akka Streams 和 Alpakka connector
我正在从 Kafka 主题读取数据,并以分区模式将其放入 Azure ADLS(类似 HDFS)。
我的代码如下:
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", topic)
.option("failOnDataLoss", false)
.load()
.selectExpr(/*"CAST(key AS STRING)",*/ "CAST(value AS STRING)").as(Encoders.STRING)
df.writeStream
.partitionBy("year", "month", "day", "hour", "minute")
.format("parquet")
.option("path", outputDirectory)
.option("checkpointLocation", checkpointDirectory)
.outputMode("append")
.start()
.awaitTermination()
我有大约 2000 records/sec,我的问题是 Spark 每 45 秒插入一次数据,我希望立即插入数据。
谁知道如何控制微批的大小?
从Spark 2.3版本开始,可以使用连续处理模式。在官方文档中。您可以看到此模式仅支持三个接收器,并且只有 Kafka 接收器已准备好用于生产,并且“end-to-endlow-latency 处理可以最好地观察 Kafka 作为源和下沉"
df
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("checkpointLocation", "/tmp/0")
.option("topic", "output0")
.trigger(Trigger.Continuous("0 seconds"))
.start()
因此,目前看来,您不能使用连续模式将 HDFS 用作接收器。在您的情况下,也许您可以测试 Akka Streams 和 Alpakka connector