如何使用 mockito 对 flink 定时器进行单元测试
How to unit test flink timers using mockito
我有一个进程函数,我正在使用 mockito 进行单元测试。
class SampleProcessFunction : KeyedProcessFunction<String, String, String>() {{
override fun processElement(dm: String, ctx: Context, out: Collector<String>) {
var retrieveState = state.value()
// schedule the next timer from the current event time
ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 3000)
}
override fun onTimer(scheduledTimer: Long, ctx: OnTimerContext?, out: Collector<String>) {
val retrieveState = state.value()
if(scheduledTimer == (ctx.timestamp()+3000)) {
out.collect(presenceEvent)
}
}
}
我想使用 mockito 对流程函数进行单元测试
internal class PresenceDetectionProcessFunctionTest {
private lateinit var out: Collector<String>
private lateinit var mockPresenceState: ValueState<Presence>
@BeforeEach
fun setUp() {
SampleProcessFunction = SampleProcessFunction()
out = mock(Collector::class.java) as Collector<PresenceEvent>
val mockRunTimeContext = mock(RuntimeContext::class.java)
mockSampleState = mock(ValueState::class.java) as ValueState<SampleState>
whenever(mockRunTimeContext.getState(ValueStateDescriptor("sample-state",
Sample::class.java))).thenReturn(mockSampleState)
sampleProcessFunction.runtimeContext = mockRunTimeContext
sampleProcessFunction.open(Configuration())
}
@Test
fun `test sample process function`() {
sampleProcessFunction.processElement("Hellp, context, out)
Mockito.verify(mockPresenceState, times(1)).update(result.capture())
}
我们如何在这里测试计时器,在我的过程中,我已经注册了事件计时器,但是当我调试时,在示例过程函数中的 ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 3000)
处抛出 NPE,因为我没有为计时器设置任何测试配置在我的测试中。我们如何在这里测试 flink 定时器功能?有什么建议吗?
一种方法是使用 Flink 的测试工具,如 described in the documentation。
这涉及:
- 为您的构建添加一些依赖项
- 使用适当的运算符实例化测试工具
- 在一些测试中使用测试工具
这是一个例子:
public class MyKeyedProcessFunctionTest {
private OneInputStreamOperatorTestHarness<Long, Long> testHarness;
@Before
public void setupTestHarness() throws Exception {
testHarness = new OneInputStreamOperatorTestHarness<>(new KeyedProcessOperator<>(new MyProcessFunction()));
testHarness.open();
}
@Test
public void testingStatefulFunction() throws Exception {
assertThat(testHarness.numKeyedStateEntries(), is(0));
testHarness.setProcessingTime(0L);
// state gets created
testHarness.processElement(2L, 100L);
assertThat(testHarness.numKeyedStateEntries(), is(not(0)));
// state is eventually cleared
testHarness.setProcessingTime(3600000L);
assertThat(testHarness.numKeyedStateEntries(), is(0));
}
@Test
public void testTimelyOperator() throws Exception {
// setup initial conditions
testHarness.processWatermark(0L);
assertThat(testHarness.numEventTimeTimers(), is(0));
// send in some data
testHarness.processElement(3L, 100L);
// verify timer
assertThat(testHarness.numEventTimeTimers(), is(1));
// fire timer
testHarness.processWatermark(20);
assertThat(testHarness.numEventTimeTimers(), is(0));
// verify results
assertThat(testHarness.getOutput(), containsInExactlyThisOrder(3L));
assertThat(testHarness.getSideOutput(new OutputTag<>("invalidRecords")), hasSize(0))
}
}
我有一个进程函数,我正在使用 mockito 进行单元测试。
class SampleProcessFunction : KeyedProcessFunction<String, String, String>() {{
override fun processElement(dm: String, ctx: Context, out: Collector<String>) {
var retrieveState = state.value()
// schedule the next timer from the current event time
ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 3000)
}
override fun onTimer(scheduledTimer: Long, ctx: OnTimerContext?, out: Collector<String>) {
val retrieveState = state.value()
if(scheduledTimer == (ctx.timestamp()+3000)) {
out.collect(presenceEvent)
}
}
}
我想使用 mockito 对流程函数进行单元测试
internal class PresenceDetectionProcessFunctionTest {
private lateinit var out: Collector<String>
private lateinit var mockPresenceState: ValueState<Presence>
@BeforeEach
fun setUp() {
SampleProcessFunction = SampleProcessFunction()
out = mock(Collector::class.java) as Collector<PresenceEvent>
val mockRunTimeContext = mock(RuntimeContext::class.java)
mockSampleState = mock(ValueState::class.java) as ValueState<SampleState>
whenever(mockRunTimeContext.getState(ValueStateDescriptor("sample-state",
Sample::class.java))).thenReturn(mockSampleState)
sampleProcessFunction.runtimeContext = mockRunTimeContext
sampleProcessFunction.open(Configuration())
}
@Test
fun `test sample process function`() {
sampleProcessFunction.processElement("Hellp, context, out)
Mockito.verify(mockPresenceState, times(1)).update(result.capture())
}
我们如何在这里测试计时器,在我的过程中,我已经注册了事件计时器,但是当我调试时,在示例过程函数中的 ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 3000)
处抛出 NPE,因为我没有为计时器设置任何测试配置在我的测试中。我们如何在这里测试 flink 定时器功能?有什么建议吗?
一种方法是使用 Flink 的测试工具,如 described in the documentation。
这涉及:
- 为您的构建添加一些依赖项
- 使用适当的运算符实例化测试工具
- 在一些测试中使用测试工具
这是一个例子:
public class MyKeyedProcessFunctionTest {
private OneInputStreamOperatorTestHarness<Long, Long> testHarness;
@Before
public void setupTestHarness() throws Exception {
testHarness = new OneInputStreamOperatorTestHarness<>(new KeyedProcessOperator<>(new MyProcessFunction()));
testHarness.open();
}
@Test
public void testingStatefulFunction() throws Exception {
assertThat(testHarness.numKeyedStateEntries(), is(0));
testHarness.setProcessingTime(0L);
// state gets created
testHarness.processElement(2L, 100L);
assertThat(testHarness.numKeyedStateEntries(), is(not(0)));
// state is eventually cleared
testHarness.setProcessingTime(3600000L);
assertThat(testHarness.numKeyedStateEntries(), is(0));
}
@Test
public void testTimelyOperator() throws Exception {
// setup initial conditions
testHarness.processWatermark(0L);
assertThat(testHarness.numEventTimeTimers(), is(0));
// send in some data
testHarness.processElement(3L, 100L);
// verify timer
assertThat(testHarness.numEventTimeTimers(), is(1));
// fire timer
testHarness.processWatermark(20);
assertThat(testHarness.numEventTimeTimers(), is(0));
// verify results
assertThat(testHarness.getOutput(), containsInExactlyThisOrder(3L));
assertThat(testHarness.getSideOutput(new OutputTag<>("invalidRecords")), hasSize(0))
}
}