当您从 Apache Kafka 摄取数据时,Apache Flink 如何生成水印?

How does Apache Flink generate watermarks when you ingest data from Apache Kafka?

我无法弄清楚当您从 Apache Kafka 提取数据时,水印究竟应该如何工作。

  1. 我读到 Flink 通过从消息中获取时间戳来自动处理水印,但他们没有具体说明从哪里来。来自消息负载、来自 headers 或来自 CreateTime?
  2. 我尝试从负载中提取以毫秒为单位的 unix 时间戳,将其放入 header,将其设置为 CreateTime,但什么也没有。水印不会提前,因此事件时间 window 不会触发。

事件格式:

hello,1641369936000
hello,1641369937000
hello,1641369938000
hello,1641369939000
...

主题kafka-topics --bootstrap-server localhost:9092 --topic testerino --partitions 1 --replication-factor 1 --create

Kafka 版本 3.0.0,Flink 1.14.2

提前致谢

final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

KafkaSource<String> stringKafkaSource = KafkaSource.<String>builder()
        .setBootstrapServers("localhost:9092")
        .setGroupId("test-group")
        .setStartingOffsets(OffsetsInitializer.latest())
        .setTopics("testerino")
        .setDeserializer(new KafkaRecordDeserializationSchema<String>() {
            @Override
            public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<String> out) throws IOException {
                System.out.println(record);
                out.collect(new String(record.value()));
            }

            @Override
            public TypeInformation<String> getProducedType() {
                return TypeInformation.of(String.class);
            }
        })
        .build();

DataStreamSource<String> streamSource = env.fromSource(
        stringKafkaSource,
        WatermarkStrategy.<String>forMonotonousTimestamps().withTimestampAssigner((event, timestamp) -> {
            return Long.parseLong(event.split(",")[1]);
        }),
        "source"
);

streamSource
        .keyBy(k -> k.split(",")[0])
        .window(TumblingEventTimeWindows.of(Time.seconds(5)))
        .trigger(new Trigger<String, TimeWindow>() {
            @Override
            public TriggerResult onElement(String element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
                System.out.printf("added elem: %s | timestamp: %s | window: %s| watermark: %d%n",
                        element, timestamp, window, ctx.getCurrentWatermark());
                return TriggerResult.CONTINUE;
            }

            @Override
            public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
                System.out.println("proccesing time trigger");
                return TriggerResult.FIRE_AND_PURGE;
            }

            @Override
            public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
                System.out.println("event time trigger");
                return TriggerResult.FIRE_AND_PURGE;
            }

            @Override
            public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
                System.out.println("clear");
            }
        })
        .process(new ProcessWindowFunction<String, String, String, TimeWindow>() {
            @Override
            public void process(String s, ProcessWindowFunction<String, String, String, TimeWindow>.Context context, Iterable<String> elements, Collector<String> out) throws Exception {
                System.out.println("watermark: " + context.currentWatermark());
                out.collect(s);
            }
        });

Flink 从不自动提供水印,但 KafkaSource 确实从 Kafka headers 获取时间戳,并使用它来设置它生成的 StreamRecords 的时间戳。这是传递给您的时间戳分配器的时间戳。

我相信 https://whosebug.com/a/70101290/2000823 解释了为什么您没有得到任何结果。