Spark Structured Streaming - 一次测试一批
Spark Structured Streaming - testing one batch at a time
我正在尝试为我已经实现的自定义 MicroBatchReadSupport
数据源创建测试。
为此,我想一次调用一个批次,它将使用此 DataSource 读取数据(我已经创建了适当的模拟)。我想调用一个批次,验证是否读取了正确的数据(目前通过将其保存到内存接收器并检查输出),然后才调用下一个批次并验证它的输出。
我找不到一个接一个地调用每个批次的方法。
如果我使用 streamingQuery.processAllAvailable()
,批次将一个接一个地调用,不允许我分别验证每个批次的输出。使用 trigger(Trigger.Once())
也没有帮助,因为它执行了一批,我无法继续下一批。
有什么方法可以做我想做的事吗?
目前这是我的基本代码:
val dataFrame = sparkSession.readStream.format("my-custom-data-source").load()
val dsw: DataStreamWriter[Row] = dataFrame.writeStream
.format("memory")
.queryName("test_output")
val streamingQuery = dsw
.start()
streamingQuery.processAllAvailable()
我最后做的是使用运行一次的 DataStreamWriter 设置测试,但将当前状态保存到检查点。因此,每次我们调用 dsw.start()
时,都会根据检查点从最新的偏移量恢复新批次。我还将数据保存到 globalTempView 中,因此我将能够以与使用内存接收器类似的方式查询数据。为此,我使用 foreachBatch
(仅自 Spark 2.4 起可用)。
这是代码:
val dataFrame = sparkSession.readStream.format("my-custom-data-source").load()
val dsw = getNewDataStreamWriter(dataFrame)
testFirstBatch(dsw)
testSecondBatch(dsw)
private def getNewDataStreamWriter(dataFrame: DataFrame) = {
val checkpointTempDir = Files.createTempDirectory("tests").toAbsolutePath.toString
val dsw: DataStreamWriter[Row] = dataFrame.writeStream
.trigger(Trigger.Once())
.option("checkpointLocation", checkpointTempDir)
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.createOrReplaceGlobalTempView("input_data")
}
dsw
}
而每批(如testFirstBatch
)的实际测试代码为:
val rows = processNextBatch(dsw)
assertResult(10)(rows.length)
private def processNextBatch(dsw: DataStreamWriter[Row]) = {
val streamingQuery = dsw
.start()
streamingQuery.processAllAvailable()
sparkSession.sql("select * from global_temp.input_data").collect()
}
我正在尝试为我已经实现的自定义 MicroBatchReadSupport
数据源创建测试。
为此,我想一次调用一个批次,它将使用此 DataSource 读取数据(我已经创建了适当的模拟)。我想调用一个批次,验证是否读取了正确的数据(目前通过将其保存到内存接收器并检查输出),然后才调用下一个批次并验证它的输出。
我找不到一个接一个地调用每个批次的方法。
如果我使用 streamingQuery.processAllAvailable()
,批次将一个接一个地调用,不允许我分别验证每个批次的输出。使用 trigger(Trigger.Once())
也没有帮助,因为它执行了一批,我无法继续下一批。
有什么方法可以做我想做的事吗?
目前这是我的基本代码:
val dataFrame = sparkSession.readStream.format("my-custom-data-source").load()
val dsw: DataStreamWriter[Row] = dataFrame.writeStream
.format("memory")
.queryName("test_output")
val streamingQuery = dsw
.start()
streamingQuery.processAllAvailable()
我最后做的是使用运行一次的 DataStreamWriter 设置测试,但将当前状态保存到检查点。因此,每次我们调用 dsw.start()
时,都会根据检查点从最新的偏移量恢复新批次。我还将数据保存到 globalTempView 中,因此我将能够以与使用内存接收器类似的方式查询数据。为此,我使用 foreachBatch
(仅自 Spark 2.4 起可用)。
这是代码:
val dataFrame = sparkSession.readStream.format("my-custom-data-source").load()
val dsw = getNewDataStreamWriter(dataFrame)
testFirstBatch(dsw)
testSecondBatch(dsw)
private def getNewDataStreamWriter(dataFrame: DataFrame) = {
val checkpointTempDir = Files.createTempDirectory("tests").toAbsolutePath.toString
val dsw: DataStreamWriter[Row] = dataFrame.writeStream
.trigger(Trigger.Once())
.option("checkpointLocation", checkpointTempDir)
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.createOrReplaceGlobalTempView("input_data")
}
dsw
}
而每批(如testFirstBatch
)的实际测试代码为:
val rows = processNextBatch(dsw)
assertResult(10)(rows.length)
private def processNextBatch(dsw: DataStreamWriter[Row]) = {
val streamingQuery = dsw
.start()
streamingQuery.processAllAvailable()
sparkSession.sql("select * from global_temp.input_data").collect()
}