如何使用 mockito 对 flink 定时器进行单元测试

How to unit test flink timers using mockito

我有一个进程函数,我正在使用 mockito 进行单元测试。

class SampleProcessFunction : KeyedProcessFunction<String, String, String>() {{
        override fun processElement(dm: String, ctx: Context, out: Collector<String>) {
            var retrieveState = state.value()
            // schedule the next timer  from the current event time
            ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 3000)
        } 
    
     override fun onTimer(scheduledTimer: Long, ctx: OnTimerContext?, out: Collector<String>) {
            val retrieveState = state.value()
    
            if(scheduledTimer == (ctx.timestamp()+3000)) {
                         out.collect(presenceEvent)
            }
        }
}

我想使用 mockito 对流程函数进行单元测试

 internal class PresenceDetectionProcessFunctionTest {
    
        private lateinit var out: Collector<String>
        private lateinit var mockPresenceState: ValueState<Presence>
    
    
        @BeforeEach
        fun setUp() {
            SampleProcessFunction = SampleProcessFunction()
            out = mock(Collector::class.java) as Collector<PresenceEvent>
            val mockRunTimeContext = mock(RuntimeContext::class.java)
            mockSampleState = mock(ValueState::class.java) as ValueState<SampleState>
           
    
            whenever(mockRunTimeContext.getState(ValueStateDescriptor("sample-state", 
             Sample::class.java))).thenReturn(mockSampleState)      
            
             sampleProcessFunction.runtimeContext = mockRunTimeContext
            sampleProcessFunction.open(Configuration())
        }
    
    @Test
    fun `test sample process function`() {
       
        sampleProcessFunction.processElement("Hellp, context, out)
        Mockito.verify(mockPresenceState, times(1)).update(result.capture())
       }

我们如何在这里测试计时器,在我的过程中,我已经注册了事件计时器,但是当我调试时,在示例过程函数中的 ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 3000) 处抛出 NPE,因为我没有为计时器设置任何测试配置在我的测试中。我们如何在这里测试 flink 定时器功能?有什么建议吗?

一种方法是使用 Flink 的测试工具,如 described in the documentation

这涉及:

  • 为您的构建添加一些依赖项
  • 使用适当的运算符实例化测试工具
  • 在一些测试中使用测试工具

这是一个例子:

public class MyKeyedProcessFunctionTest {
    private OneInputStreamOperatorTestHarness<Long, Long> testHarness;

    @Before
    public void setupTestHarness() throws Exception {
        testHarness = new OneInputStreamOperatorTestHarness<>(new KeyedProcessOperator<>(new MyProcessFunction()));
        testHarness.open();
    }

    @Test
    public void testingStatefulFunction() throws Exception {
        assertThat(testHarness.numKeyedStateEntries(), is(0));
        testHarness.setProcessingTime(0L);
        
        // state gets created
        testHarness.processElement(2L, 100L);
        assertThat(testHarness.numKeyedStateEntries(), is(not(0)));
        
        // state is eventually cleared
        testHarness.setProcessingTime(3600000L);
        assertThat(testHarness.numKeyedStateEntries(), is(0));
    }

    @Test
    public void testTimelyOperator() throws Exception {
    
        // setup initial conditions
        testHarness.processWatermark(0L);
        assertThat(testHarness.numEventTimeTimers(), is(0));
    
        // send in some data
        testHarness.processElement(3L, 100L);
    
        // verify timer
        assertThat(testHarness.numEventTimeTimers(), is(1));
    
        // fire timer
        testHarness.processWatermark(20);
        assertThat(testHarness.numEventTimeTimers(), is(0));
    
        // verify results
        assertThat(testHarness.getOutput(), containsInExactlyThisOrder(3L));
        assertThat(testHarness.getSideOutput(new OutputTag<>("invalidRecords")), hasSize(0))
    }
}