为什么不在 RichFlatMapFunction 中处理数据

why data is not processed in RichFlatMapFunction

为了提高数据处理的性能,我们将事件存储到地图中,直到事件计数达到 100 时才处理它们。 同时在open方法中启动一个定时器,所以每60秒处理一次数据

这适用于 flink 版本为 1.11.3,

升级flink版本后1.13.0

我发现有时事件从Kafka持续消费,但没有在RichFlatMapFunction中处理,这意味着数据丢失。 重新启动服务后,一切正常,但几个小时后,同样的事情又发生了。

此 flink 版本有任何已知问题吗?任何建议表示赞赏。

public class MyJob {
    public static void main(String[] args) throws Exception {
        ...
        DataStream<String> rawEventSource = env.addSource(flinkKafkaConsumer);
        ...
    }
public class MyMapFunction extends RichFlatMapFunction<String, String> implements Serializable {

    @Override
    public void open(Configuration parameters) {
        ...
        long periodTimeout = 60;
        pool.scheduleAtFixedRate(() -> {
            // processing data
        }, periodTimeout, periodTimeout, TimeUnit.SECONDS);
    }
    
    @Override
    public void flatMap(String message, Collector<String> out) {
        // store event to map 
        // count event, 
        // when count = 100, start data processing
    }
}

你应该避免在 Flink 函数中使用用户线程和定时器。支持的机制是使用带有处理时间计时器的 KeyedProcessFunction。