使用 Flink 使用 DateStreamSource<List<T>> 分配水印的正确方法
Proper way to assign watermark with DateStreamSource<List<T>> using Flink
我有一个持续的 JSONArray 数据生成到 Kafka 主题,我想用 EventTime characteristic.In 处理记录为了达到这个目标,我必须为 JSONArray 中包含的每条记录分配水印。
我没有找到一个方便的方法来实现这个 goal.My 解决方案是使用来自 DataStreamSource> 的数据,然后迭代 List 并使用匿名 ProcessFunction 收集对象到下游,最后将水印分配给这个下游。
主要代码如下:
DataStreamSource<List<MockData>> listDataStreamSource = KafkaSource.genStream(env);
SingleOutputStreamOperator<MockData> convertToPojo = listDataStreamSource
.process(new ProcessFunction<List<MockData>, MockData>() {
@Override
public void processElement(List<MockData> value, Context ctx, Collector<MockData> out)
throws Exception {
value.forEach(mockData -> out.collect(mockData));
}
});
convertToPojo.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<MockData>(Time.seconds(5)) {
@Override
public long extractTimestamp(MockData element) {
return element.getTimestamp();
}
});
SingleOutputStreamOperator<Tuple2<String, Long>> countStream = convertToPojo
.keyBy("country").window(
SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(10)))
.process(
new FlinkEventTimeCountFunction()).name("count elements");
毫无疑问,代码似乎没问题,运行 没有错误,因为 well.But ProcessWindowFunction 从未 triggered.I 跟踪过 Flink 源代码,从未找到 EventTimeTrigger returns TriggerResult.FIRE,一直由 TriggerContext.getCurrentWatermark returns Long.MIN_VALUE 引起。
在事件时间处理列表的正确方法是什么?任何建议将不胜感激。
问题是您将 keyBy 和 window 操作应用于 convertToPojo 流,而不是带有时间戳和水印(您没有分配给变量)的流。
如果您或多或少像这样编写代码,它应该可以工作:
listDataStreamSource = KafkaSource ...
convertToPojo = listDataStreamSource.process ...
pojoPlusWatermarks = convertToPojo.assignTimestampsAndWatermarks ...
countStream = pojoPlusWatermarks.keyBy ...
在 convertToPojo 流上调用 assignTimestampsAndWatermarks 不会修改该流,而是创建一个包含时间戳和水印的新数据流对象。您需要将 windowing 应用于该新数据流。
我有一个持续的 JSONArray 数据生成到 Kafka 主题,我想用 EventTime characteristic.In 处理记录为了达到这个目标,我必须为 JSONArray 中包含的每条记录分配水印。
我没有找到一个方便的方法来实现这个 goal.My 解决方案是使用来自 DataStreamSource> 的数据,然后迭代 List 并使用匿名 ProcessFunction 收集对象到下游,最后将水印分配给这个下游。
主要代码如下:
DataStreamSource<List<MockData>> listDataStreamSource = KafkaSource.genStream(env);
SingleOutputStreamOperator<MockData> convertToPojo = listDataStreamSource
.process(new ProcessFunction<List<MockData>, MockData>() {
@Override
public void processElement(List<MockData> value, Context ctx, Collector<MockData> out)
throws Exception {
value.forEach(mockData -> out.collect(mockData));
}
});
convertToPojo.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<MockData>(Time.seconds(5)) {
@Override
public long extractTimestamp(MockData element) {
return element.getTimestamp();
}
});
SingleOutputStreamOperator<Tuple2<String, Long>> countStream = convertToPojo
.keyBy("country").window(
SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(10)))
.process(
new FlinkEventTimeCountFunction()).name("count elements");
毫无疑问,代码似乎没问题,运行 没有错误,因为 well.But ProcessWindowFunction 从未 triggered.I 跟踪过 Flink 源代码,从未找到 EventTimeTrigger returns TriggerResult.FIRE,一直由 TriggerContext.getCurrentWatermark returns Long.MIN_VALUE 引起。
在事件时间处理列表的正确方法是什么?任何建议将不胜感激。
问题是您将 keyBy 和 window 操作应用于 convertToPojo 流,而不是带有时间戳和水印(您没有分配给变量)的流。
如果您或多或少像这样编写代码,它应该可以工作:
listDataStreamSource = KafkaSource ...
convertToPojo = listDataStreamSource.process ...
pojoPlusWatermarks = convertToPojo.assignTimestampsAndWatermarks ...
countStream = pojoPlusWatermarks.keyBy ...
在 convertToPojo 流上调用 assignTimestampsAndWatermarks 不会修改该流,而是创建一个包含时间戳和水印的新数据流对象。您需要将 windowing 应用于该新数据流。