如何对处理时间计时器进行单元测试?
How to do unit tests for processing time timers?
我正在为 CoProcessFunction
编写单元测试。有没有办法在测试中手动推动处理时间以触发 onTimer
调用?
Flink 提供测试工具来测试带有定时器和状态的函数。
允许您 "control" 时间并验证状态的属性。
以下代码块复制自official documentation
让您了解安全带的使用方法。
public class StatefulFlatMapTest {
private OneInputStreamOperatorTestHarness<Long, Long> testHarness;
private StatefulFlatMap statefulFlatMapFunction;
@Before
public void setupTestHarness() throws Exception {
//instantiate user-defined function
statefulFlatMapFunction = new StatefulFlatMapFunction();
// wrap user defined function into a the corresponding operator
testHarness = new OneInputStreamOperatorTestHarness<>(new StreamFlatMap<>(statefulFlatMapFunction));
// optionally configured the execution environment
testHarness.getExecutionConfig().setAutoWatermarkInterval(50);
// open the test harness (will also call open() on RichFunctions)
testHarness.open();
}
@Test
public void testingStatefulFlatMapFunction() throws Exception {
//push (timestamped) elements into the operator (and hence user defined function)
testHarness.processElement(2L, 100L);
//trigger event time timers by advancing the event time of the operator with a watermark
testHarness.processWatermark(100L);
//trigger processing time timers by advancing the processing time of the operator directly
testHarness.setProcessingTime(100L);
//retrieve list of emitted records for assertions
assertThat(testHarness.getOutput(), containsInExactlyThisOrder(3L));
//retrieve list of records emitted to a specific side output for assertions (ProcessFunction only)
//assertThat(testHarness.getSideOutput(new OutputTag<>("invalidRecords")), hasSize(0))
}
}
请查看文档以了解有关要包含哪些依赖项、存在哪些工具以及如何使用它们的详细信息。
我正在为 CoProcessFunction
编写单元测试。有没有办法在测试中手动推动处理时间以触发 onTimer
调用?
Flink 提供测试工具来测试带有定时器和状态的函数。 允许您 "control" 时间并验证状态的属性。
以下代码块复制自official documentation 让您了解安全带的使用方法。
public class StatefulFlatMapTest {
private OneInputStreamOperatorTestHarness<Long, Long> testHarness;
private StatefulFlatMap statefulFlatMapFunction;
@Before
public void setupTestHarness() throws Exception {
//instantiate user-defined function
statefulFlatMapFunction = new StatefulFlatMapFunction();
// wrap user defined function into a the corresponding operator
testHarness = new OneInputStreamOperatorTestHarness<>(new StreamFlatMap<>(statefulFlatMapFunction));
// optionally configured the execution environment
testHarness.getExecutionConfig().setAutoWatermarkInterval(50);
// open the test harness (will also call open() on RichFunctions)
testHarness.open();
}
@Test
public void testingStatefulFlatMapFunction() throws Exception {
//push (timestamped) elements into the operator (and hence user defined function)
testHarness.processElement(2L, 100L);
//trigger event time timers by advancing the event time of the operator with a watermark
testHarness.processWatermark(100L);
//trigger processing time timers by advancing the processing time of the operator directly
testHarness.setProcessingTime(100L);
//retrieve list of emitted records for assertions
assertThat(testHarness.getOutput(), containsInExactlyThisOrder(3L));
//retrieve list of records emitted to a specific side output for assertions (ProcessFunction only)
//assertThat(testHarness.getSideOutput(new OutputTag<>("invalidRecords")), hasSize(0))
}
}
请查看文档以了解有关要包含哪些依赖项、存在哪些工具以及如何使用它们的详细信息。