为什么不在 RichFlatMapFunction 中处理数据
why data is not processed in RichFlatMapFunction
为了提高数据处理的性能,我们将事件存储到地图中,直到事件计数达到 100 时才处理它们。
同时在open方法中启动一个定时器,所以每60秒处理一次数据
这适用于 flink 版本为 1.11.3,
升级flink版本后1.13.0
我发现有时事件从Kafka持续消费,但没有在RichFlatMapFunction中处理,这意味着数据丢失。
重新启动服务后,一切正常,但几个小时后,同样的事情又发生了。
此 flink 版本有任何已知问题吗?任何建议表示赞赏。
public class MyJob {
public static void main(String[] args) throws Exception {
...
DataStream<String> rawEventSource = env.addSource(flinkKafkaConsumer);
...
}
public class MyMapFunction extends RichFlatMapFunction<String, String> implements Serializable {
@Override
public void open(Configuration parameters) {
...
long periodTimeout = 60;
pool.scheduleAtFixedRate(() -> {
// processing data
}, periodTimeout, periodTimeout, TimeUnit.SECONDS);
}
@Override
public void flatMap(String message, Collector<String> out) {
// store event to map
// count event,
// when count = 100, start data processing
}
}
你应该避免在 Flink 函数中使用用户线程和定时器。支持的机制是使用带有处理时间计时器的 KeyedProcessFunction。
为了提高数据处理的性能,我们将事件存储到地图中,直到事件计数达到 100 时才处理它们。 同时在open方法中启动一个定时器,所以每60秒处理一次数据
这适用于 flink 版本为 1.11.3,
升级flink版本后1.13.0
我发现有时事件从Kafka持续消费,但没有在RichFlatMapFunction中处理,这意味着数据丢失。 重新启动服务后,一切正常,但几个小时后,同样的事情又发生了。
此 flink 版本有任何已知问题吗?任何建议表示赞赏。
public class MyJob {
public static void main(String[] args) throws Exception {
...
DataStream<String> rawEventSource = env.addSource(flinkKafkaConsumer);
...
}
public class MyMapFunction extends RichFlatMapFunction<String, String> implements Serializable {
@Override
public void open(Configuration parameters) {
...
long periodTimeout = 60;
pool.scheduleAtFixedRate(() -> {
// processing data
}, periodTimeout, periodTimeout, TimeUnit.SECONDS);
}
@Override
public void flatMap(String message, Collector<String> out) {
// store event to map
// count event,
// when count = 100, start data processing
}
}
你应该避免在 Flink 函数中使用用户线程和定时器。支持的机制是使用带有处理时间计时器的 KeyedProcessFunction。