Flink 事件会话 Window 未发出记录

Flink Event Session Window not emitting records

我正在为一个由 id 和 window 使用 eventSessionWindow 键入的用户写一个管道来分组会话。我正在使用 Periodic WM 和一个自定义会话累加器,它将计算给定会话的事件。

发生的事情是我的 window 操作员正在消耗记录但没有发出。我不确定这里缺少什么。


FlinkKafkaConsumer010<String> eventSource =
                new FlinkKafkaConsumer010<>("events", new SimpleStringSchema(), properties);
        eventSource.setStartFromLatest();

DataStream<Event> eventStream = env.addSource(eventSource
        ).flatMap(
                new FlatMapFunction<String, Event>() {

                    @Override
                    public void flatMap(String value, Collector<Event> out) throws Exception {
                        out.collect(Event.toEvent(value));
                    }
                }
        ).assignTimestampsAndWatermarks(
                new AssignerWithPeriodicWatermarks<Event>() {
                    long maxTime;

                    @Override
                    public long extractTimestamp(Event element, long previousElementTimestamp) {
                        maxTime = Math.max(previousElementTimestamp, maxTime);
                        return previousElementTimestamp;
                    }

                    @Nullable
                    @Override
                    public Watermark getCurrentWatermark() {
                        return new Watermark(maxTime);
                    }
                }
        );

       DataStream <Session> session_stream =eventStream.keyBy((KeySelector<Event, String>)value -> value.id)
                .window(EventTimeSessionWindows.withGap(Time.minutes(5)))

                .aggregate(new AggregateFunction<Event, pipe.SessionAccumulator, Session>() {
                    @Override
                    public pipe.SessionAccumulator createAccumulator() {
                        return new pipe.SessionAccumulator();
                    }

                    @Override
                    public pipe.SessionAccumulator add(Event e, pipe.SessionAccumulator sessionAccumulator) {
                        sessionAccumulator.add(e);
                        return sessionAccumulator;
                    }

                    @Override
                    public Session getResult(pipe.SessionAccumulator sessionAccumulator) {
                        return sessionAccumulator.getLocalValue();
                    }

                    @Override
                    public pipe.SessionAccumulator merge(pipe.SessionAccumulator prev, pipe.SessionAccumulator next) {
                        prev.merge(next);
                        return prev;
                    }

                }, new WindowFunction<Session, Session, String, TimeWindow>() {
                    @Override
                    public void apply(String s, TimeWindow timeWindow, Iterable<Session> iterable, Collector<Session> collector) throws Exception {
                        collector.collect(iterable.iterator().next());
                    }
                });


    public static class SessionAccumulator implements Accumulator<Event, Session>{
        Session session;

        public SessionAccumulator(){
            session = new Session();
        }

        @Override
        public void add(Event e) {
            session.add(e);

        }

        @Override
        public Session getLocalValue() {
            return session;
        }

        @Override
        public void resetLocal() {
            session =  new Session();

        }

        @Override
        public void merge(Accumulator<Event, Session> accumulator) {
            session.merge(Collections.singletonList(accumulator.getLocalValue()));

        }

        @Override
        public Accumulator<Event, Session> clone() {
            SessionAccumulator sessionAccumulator = new SessionAccumulator();
            sessionAccumulator.session = new Session(
                    session.id,
            );
            return sessionAccumulator;
        }
    }


    public static class SessionAccumulator implements Accumulator<Event, Session>{
        Session session;

        public SessionAccumulator(){
            session = new Session();
        }

        @Override
        public void add(Event e) {
            session.add(e);

        }

        @Override
        public Session getLocalValue() {
            return session;
        }

        @Override
        public void resetLocal() {
            session =  new Session();

        }

        @Override
        public void merge(Accumulator<Event, Session> accumulator) {
            session.merge(Collections.singletonList(accumulator.getLocalValue()));

        }

        @Override
        public Accumulator<Event, Session> clone() {
            SessionAccumulator sessionAccumulator = new SessionAccumulator();
            sessionAccumulator.session = new Session(
                    session.id,
                    session.lastEventTime,
                    session.earliestEventTime,
                    session.count;

            );
            return sessionAccumulator;
        }
    }

如果您的水印没有前进,这可以解释为什么 window 没有发出任何结果。可能的原因包括:

  • Kafka 尚未为您的事件添加时间戳,因此未设置 previousElementTimestamp。
  • 您有一个空闲的 Kafka 分区来保存水印。 (这是一个有点复杂的话题。如果这最终证明是你的问题的原因,而你卡住了,请回来提出一个新问题。)

另一种可能性是,事件之间永远不会有 5 分钟的间隔,在这种情况下,事件将在一个永无止境的会话中累积。

此外,您似乎没有安装水槽。如果您不打印或以其他方式将结果发送到接收器,Flink 将不会执行任何操作。

并且不要忘记您必须调用 env.execute() 才能发生任何事情。

其他一些事情:

您的水印生成器不允许出现任何乱序,因此 window 将忽略所有乱序事件(因为它们会延迟)。如果您的事件具有严格的升序时间戳,您应该继续使用 AscendingTimestampExtractor; if they can be out-of-order, then a BoundedOutOfOrdernessTimestampExtractor 是合适的。

您的 WindowFunction 是多余的。它只是将聚合器的结果转发到下游,因此您可以将其删除。

您发布了两种不同的 SessionAccumulator 实现。