Spark Structured Streaming:将流与应该在每个微批次中读取的数据连接起来

Spark Structured Streaming: join stream with data that should be read every micro batch

我有一个来自 HDFS 的流,我需要将它与也在 HDFS 中的我的元数据结合起来,都是 Parquets。

我的元数据有时会更新,我需要加入最新的和最新的,这意味着理想情况下从 HDFS 读取每个流微批处理的元数据。

我试图对此进行测试,但不幸的是,即使我尝试使用 spark.sql.parquet.cacheMetadata=false.

,Spark 也会在缓存文件(据推测)后读取元数据

有没有办法读取每个微批次? Foreach Writer 不是我要找的?

代码示例如下:

spark.sql("SET spark.sql.streaming.schemaInference=true")

spark.sql("SET spark.sql.parquet.cacheMetadata=false")

val stream = spark.readStream.parquet("/tmp/streaming/")

val metadata = spark.read.parquet("/tmp/metadata/")

val joinedStream = stream.join(metadata, Seq("id"))

joinedStream.writeStream.option("checkpointLocation", "/tmp/streaming-test/checkpoint").format("console").start()



/tmp/metadata/ got updated with spark append mode.

据我了解,通过 JDBC 访问元数据,Spark 将查询每个微批次。

据我所知,有两种选择:

  1. 创建临时视图并使用间隔刷新:

    metadata.createOrReplaceTempView("metadata")

并在单独的线程中触发刷新:

spark.catalog.refreshTable("metadata")

注意:在这种情况下,spark 将只读取相同的路径,如果您需要从 HDFS 上的不同文件夹读取元数据,则它不起作用,例如带有时间戳等

  1. 重启流,间隔为Tathagata Das suggested

这种方式不适合我,因为我的元数据可能每小时刷新几次。