Flink window 函数 getResult 未触发

Flink window function getResult not fired

我正在尝试在我的 Flink 作业中使用事件时间,并使用 BoundedOutOfOrdernessTimestampExtractor 提取时间戳并生成水印。 但是我有一些具有稀疏流的输入 Kafka,它可以很长时间没有数据,这使得 AggregateFunction 中的 getResult 根本没有被调用。我可以看到数据进入 add 函数。

我设置了getEnv().getConfig().setAutoWatermarkInterval(1000L); 我试过了

 eventsWithKey
            .keyBy(entry -> (String) entry.get(key))
            .window(TumblingEventTimeWindows.of(Time.minutes(windowInMinutes)))
            .allowedLateness(WINDOW_LATENESS)
            .aggregate(new CountTask(basicMetricTags, windowInMinutes))

还有会话window

eventsWithKey
            .keyBy(entry -> (String) entry.get(key))
            .window(EventTimeSessionWindows.withGap(Time.seconds(30)))
            .aggregate(new CountTask(basicMetricTags, windowInMinutes))

所有水印指标显示No Watermark 怎么让Flink忽略那个没有水印的东西?

仅供参考,这通常称为 "idle source" 问题。发生这种情况是因为每当 Flink 运算符有两个或更多输入时,其水印是其输入中水印的最小值。如果这些输入之一停止,其水印将不再前进。

请注意,Flink 没有每个键的水印——一个给定的运算符通常在许多键的事件中多路复用。只要一些事件流过给定任务的输入流,它的水印就会前进,并且空闲键的事件时间计时器仍然会触发。要发生此 "idle source" 问题,任务必须具有已完全空闲的输入流。

如果可以安排,最好的解决方案是让您的数据源包含保活事件。这将使您能够自信地提升水印,因为您知道源只是空闲的,而不是离线的。

如果那不可能,并且如果您有一些不空闲的源,那么您可以在 BoundedOutOfOrdernessTimestampExtractor 前面(以及 keyBy 之前)放置一个 rebalance(),这样每个实例继续接收一些事件并可以推进其水印。这是以额外的网络洗牌为代价的。

也许最常用的解决方案是使用水印生成器来检测空闲并根据处理时间计时器人为地推进水印。 ProcessingTimeTrailingBoundedOutOfOrdernessTimestampExtractor 就是一个例子。

引入了具有空闲功能的新水印。 Flink 在计算最小值时将忽略这些空闲水印,因此将考虑包含数据的单个分区。 https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/api/common/eventtime/WatermarksWithIdleness.html

我有同样的问题 - 一个 src 可能长时间处于非活动状态。
以下解决方案基于 WatermarksWithIdleness.

这是一个演示概念的独立 Flink 作业。

package com.demo.playground.flink.sleepysrc;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.eventtime.WatermarksWithIdleness;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;


public class SleepyJob {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        final EventGenerator eventGenerator = new EventGenerator();
        WatermarkStrategy<Event> strategy = WatermarkStrategy.
                <Event>forBoundedOutOfOrderness(Duration.ofSeconds(5)).
                withIdleness(Duration.ofSeconds(Constants.IDLE_TIME_SEC)).
                withTimestampAssigner((event, timestamp) -> event.timestamp);
        final DataStream<Event> events = env.addSource(eventGenerator).assignTimestampsAndWatermarks(strategy);
        KeyedStream<Event, String> eventStringKeyedStream = events.keyBy((Event event) -> event.id);
        WindowedStream<Event, String, TimeWindow> windowedStream = eventStringKeyedStream.window(EventTimeSessionWindows.withGap(Time.milliseconds(Constants.SESSION_WINDOW_GAP)));
        windowedStream.allowedLateness(Time.milliseconds(1000));
        SingleOutputStreamOperator<Object> result = windowedStream.process(new ProcessWindowFunction<Event, Object, String, TimeWindow>() {
            @Override
            public void process(String s, Context context, Iterable<Event> events, Collector<Object> collector) {
                int counter = 0;
                for (Event e : events) {
                    Utils.print(++counter + ") inside process: " + e);
                }
                Utils.print("--- Process Done ----");
            }
        });
        result.print();
        env.execute("Sleepy flink src demo");
    }


    private static class Event {
        public Event(String id) {
            this.timestamp = System.currentTimeMillis();
            this.eventData = "not_important_" + this.timestamp;
            this.id = id;
        }

        @Override
        public String toString() {
            return "Event{" +
                    "id=" + id +
                    ", timestamp=" + timestamp +
                    ", eventData='" + eventData + '\'' +
                    '}';
        }

        public String id;
        public long timestamp;
        public String eventData;
    }

    private static class EventGenerator implements SourceFunction<Event> {

        @Override
        public void run(SourceContext<Event> ctx) throws Exception {
            /**
             *  Here is the sleepy src - after NUM_OF_EVENTS events are collected , the code goes to a SHORT_SLEEP_TIME sleep
             *  We would like to detect this inactivity and FIRE the window
             */
            int counter = 0;
            while (running) {
                String id = Long.toString(System.currentTimeMillis());
                Utils.print(String.format("Generating %d events with id %s", 2 * Constants.NUM_OF_EVENTS, id));
                while (counter < Constants.NUM_OF_EVENTS) {
                    Event event = new Event(id);
                    ctx.collect(event);
                    counter++;
                    Thread.sleep(Constants.VERY_SHORT_SLEEP_TIME);
                }
                // here we create a delay:
                // a time of inactivity where
                // we would like to FIRE the window
                Thread.sleep(Constants.SHORT_SLEEP_TIME);
                counter = 0;
                while (counter < Constants.NUM_OF_EVENTS) {
                    Event event = new Event(id);
                    ctx.collect(event);
                    counter++;
                    Thread.sleep(Constants.VERY_SHORT_SLEEP_TIME);
                }
                Thread.sleep(Constants.LONG_SLEEP_TIME);
            }
        }

        @Override
        public void cancel() {
            this.running = false;
        }

        private volatile boolean running = true;

    }

    private static final class Constants {
        public static final int VERY_SHORT_SLEEP_TIME = 300;
        public static final int SHORT_SLEEP_TIME = 8000;
        public static final int IDLE_TIME_SEC = 5;
        public static final int LONG_SLEEP_TIME = SHORT_SLEEP_TIME * 5;
        public static final long SESSION_WINDOW_GAP = 60 * 1000;
        public static final int NUM_OF_EVENTS = 4;
    }

    private static final class Utils {
        public static void print(Object obj) {
            System.out.println(new java.util.Date() + " > " + obj);
        }
    }
}

对于其他人,如果您使用的是 Kafka,请确保所有主题的分区都有数据

我知道这听起来很愚蠢,但在我的例子中,我只有一个来源,但问题仍然存在,因为我在具有 10 个分区的单个 Kafka 主题(单一来源)中使用非常少的数据进行测试。数据集太小以至于主题的一些分区没有任何东西可以提供,虽然我只有一个来源(一个主题),但 Flink 没有增加 Watermark。

当我将源切换到具有单个分区的主题时,水印开始前进。