Spark Structured Streaming - 一次测试一批

Spark Structured Streaming - testing one batch at a time

我正在尝试为我已经实现的自定义 MicroBatchReadSupport 数据源创建测试。

为此,我想一次调用一个批次,它将使用此 DataSource 读取数据(我已经创建了适当的模拟)。我想调用一个批次,验证是否读取了正确的数据(目前通过将其保存到内存接收器并检查输出),然后才调用下一个批次并验证它的输出。

我找不到一个接一个地调用每个批次的方法。 如果我使用 streamingQuery.processAllAvailable(),批次将一个接一个地调用,不允许我分别验证每个批次的输出。使用 trigger(Trigger.Once()) 也没有帮助,因为它执行了一批,我无法继续下一批。

有什么方法可以做我想做的事吗?

目前这是我的基本代码:

val dataFrame = sparkSession.readStream.format("my-custom-data-source").load()
    val dsw: DataStreamWriter[Row] = dataFrame.writeStream
      .format("memory")
      .queryName("test_output")
    val streamingQuery = dsw
      .start()
    streamingQuery.processAllAvailable()

我最后做的是使用运行一次的 DataStreamWriter 设置测试,但将当前状态保存到检查点。因此,每次我们调用 dsw.start() 时,都会根据检查点从最新的偏移量恢复新批次。我还将数据保存到 globalTempView 中,因此我将能够以与使用内存接收器类似的方式查询数据。为此,我使用 foreachBatch(仅自 Spark 2.4 起可用)。

这是代码:

val dataFrame = sparkSession.readStream.format("my-custom-data-source").load()
val dsw = getNewDataStreamWriter(dataFrame)

testFirstBatch(dsw)
testSecondBatch(dsw)

private def getNewDataStreamWriter(dataFrame: DataFrame) = {
    val checkpointTempDir = Files.createTempDirectory("tests").toAbsolutePath.toString
    val dsw: DataStreamWriter[Row] = dataFrame.writeStream
    .trigger(Trigger.Once())
    .option("checkpointLocation", checkpointTempDir)
    .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
        batchDF.createOrReplaceGlobalTempView("input_data")
    }
    dsw
}

而每批(如testFirstBatch)的实际测试代码为:

val rows = processNextBatch(dsw)
assertResult(10)(rows.length)

private def processNextBatch(dsw: DataStreamWriter[Row]) = {
    val streamingQuery = dsw
        .start()
    streamingQuery.processAllAvailable()
    sparkSession.sql("select * from global_temp.input_data").collect()
}