当您从 Apache Kafka 摄取数据时,Apache Flink 如何生成水印?
How does Apache Flink generate watermarks when you ingest data from Apache Kafka?
我无法弄清楚当您从 Apache Kafka 提取数据时,水印究竟应该如何工作。
- 我读到 Flink 通过从消息中获取时间戳来自动处理水印,但他们没有具体说明从哪里来。来自消息负载、来自 headers 或来自 CreateTime?
- 我尝试从负载中提取以毫秒为单位的 unix 时间戳,将其放入 header,将其设置为 CreateTime,但什么也没有。水印不会提前,因此事件时间 window 不会触发。
事件格式:
hello,1641369936000
hello,1641369937000
hello,1641369938000
hello,1641369939000
...
主题kafka-topics --bootstrap-server localhost:9092 --topic testerino --partitions 1 --replication-factor 1 --create
Kafka 版本 3.0.0,Flink 1.14.2
提前致谢
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
KafkaSource<String> stringKafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setGroupId("test-group")
.setStartingOffsets(OffsetsInitializer.latest())
.setTopics("testerino")
.setDeserializer(new KafkaRecordDeserializationSchema<String>() {
@Override
public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<String> out) throws IOException {
System.out.println(record);
out.collect(new String(record.value()));
}
@Override
public TypeInformation<String> getProducedType() {
return TypeInformation.of(String.class);
}
})
.build();
DataStreamSource<String> streamSource = env.fromSource(
stringKafkaSource,
WatermarkStrategy.<String>forMonotonousTimestamps().withTimestampAssigner((event, timestamp) -> {
return Long.parseLong(event.split(",")[1]);
}),
"source"
);
streamSource
.keyBy(k -> k.split(",")[0])
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.trigger(new Trigger<String, TimeWindow>() {
@Override
public TriggerResult onElement(String element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
System.out.printf("added elem: %s | timestamp: %s | window: %s| watermark: %d%n",
element, timestamp, window, ctx.getCurrentWatermark());
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
System.out.println("proccesing time trigger");
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
System.out.println("event time trigger");
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
System.out.println("clear");
}
})
.process(new ProcessWindowFunction<String, String, String, TimeWindow>() {
@Override
public void process(String s, ProcessWindowFunction<String, String, String, TimeWindow>.Context context, Iterable<String> elements, Collector<String> out) throws Exception {
System.out.println("watermark: " + context.currentWatermark());
out.collect(s);
}
});
Flink 从不自动提供水印,但 KafkaSource 确实从 Kafka headers 获取时间戳,并使用它来设置它生成的 StreamRecords 的时间戳。这是传递给您的时间戳分配器的时间戳。
我相信 https://whosebug.com/a/70101290/2000823 解释了为什么您没有得到任何结果。
我无法弄清楚当您从 Apache Kafka 提取数据时,水印究竟应该如何工作。
- 我读到 Flink 通过从消息中获取时间戳来自动处理水印,但他们没有具体说明从哪里来。来自消息负载、来自 headers 或来自 CreateTime?
- 我尝试从负载中提取以毫秒为单位的 unix 时间戳,将其放入 header,将其设置为 CreateTime,但什么也没有。水印不会提前,因此事件时间 window 不会触发。
事件格式:
hello,1641369936000
hello,1641369937000
hello,1641369938000
hello,1641369939000
...
主题kafka-topics --bootstrap-server localhost:9092 --topic testerino --partitions 1 --replication-factor 1 --create
Kafka 版本 3.0.0,Flink 1.14.2
提前致谢
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
KafkaSource<String> stringKafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setGroupId("test-group")
.setStartingOffsets(OffsetsInitializer.latest())
.setTopics("testerino")
.setDeserializer(new KafkaRecordDeserializationSchema<String>() {
@Override
public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<String> out) throws IOException {
System.out.println(record);
out.collect(new String(record.value()));
}
@Override
public TypeInformation<String> getProducedType() {
return TypeInformation.of(String.class);
}
})
.build();
DataStreamSource<String> streamSource = env.fromSource(
stringKafkaSource,
WatermarkStrategy.<String>forMonotonousTimestamps().withTimestampAssigner((event, timestamp) -> {
return Long.parseLong(event.split(",")[1]);
}),
"source"
);
streamSource
.keyBy(k -> k.split(",")[0])
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.trigger(new Trigger<String, TimeWindow>() {
@Override
public TriggerResult onElement(String element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
System.out.printf("added elem: %s | timestamp: %s | window: %s| watermark: %d%n",
element, timestamp, window, ctx.getCurrentWatermark());
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
System.out.println("proccesing time trigger");
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
System.out.println("event time trigger");
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
System.out.println("clear");
}
})
.process(new ProcessWindowFunction<String, String, String, TimeWindow>() {
@Override
public void process(String s, ProcessWindowFunction<String, String, String, TimeWindow>.Context context, Iterable<String> elements, Collector<String> out) throws Exception {
System.out.println("watermark: " + context.currentWatermark());
out.collect(s);
}
});
Flink 从不自动提供水印,但 KafkaSource 确实从 Kafka headers 获取时间戳,并使用它来设置它生成的 StreamRecords 的时间戳。这是传递给您的时间戳分配器的时间戳。
我相信 https://whosebug.com/a/70101290/2000823 解释了为什么您没有得到任何结果。