Flink slide window join 中如何避免元组重复?

How to avoid repeated tuples in Flink slide window join?

例如,有两个流。一种是向用户展示的广告。其中的元组可以描述为 (advertiseId, showed timestamp)。另一个是点击流——(advertiseId,点击时间戳)。我们想要获得一个加入流,其中包括在显示后 20 分钟内用户点击的所有广告。我的解决方案是在 SlidingTimeWindow 上加入这两个流。但是在加入的流中,有很多重复的元组。我怎样才能在新流中加入一个元组?

stream1.join(stream2)
        .where(0)
        .equalTo(0)
        .window(SlidingTimeWindows.of(Time.of(30, TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS)))

在您的代码中,您定义了重叠滑动 window(滑动小于 window 大小)。如果您不想重复,您可以通过仅指定 window 大小(默认幻灯片等于 window 大小)来定义非重叠 window。

解决方案 1:

让 flink 支持像 Spark 流一样在单独的 windows 上加入两个流。在这种情况下,在广告流上实现 SlidingTimeWindows(21 分钟,1 分钟),在点击流上实现 TupblingTimeWindows(1 分钟),然后加入这两个 windowed 流。

TupblingTimeWindows 可以避免加入流中的重复记录。 21 分钟大小的 SlidingTimeWindows 可以避免丢失合法点击。 一个问题是加入的流中会有一些非法点击(20 分钟后点击)。通过添加过滤器可以轻松解决此问题。

MultiWindowsJoinedStreams<Tuple2<String, Long>, Tuple2<String, Long>> joinedStreams =
            new MultiWindowsJoinedStreams<>(advertisement, click);

    DataStream<Tuple3<String, Long, Long>> joinedStream = joinedStreams.where(keySelector)
            .window(SlidingTimeWindows.of(Time.of(21, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS)))
            .equalTo(keySelector)
            .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
            .apply(new JoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, Tuple3<String, Long, Long>>() {
                private static final long serialVersionUID = -3625150954096822268L;

                @Override
                public Tuple3<String, Long, Long> join(Tuple2<String, Long> first, Tuple2<String, Long> second) throws Exception {
                    return new Tuple3<>(first.f0, first.f1, second.f1);
                }
            });

    joinedStream = joinedStream.filter(new FilterFunction<Tuple3<String, Long, Long>>() {
        private static final long serialVersionUID = -4325256210808325338L;

        @Override
        public boolean filter(Tuple3<String, Long, Long> value) throws Exception {
            return value.f1<value.f2&&value.f1+20000>=value.f2;
        }
    });

解决方案 2:

Flink 支持没有 window 的连接操作。连接运算符实现接口 TwoInputStreamOperator 保留这两个流的两个缓冲区(基于时间长度)并输出一个连接流。

DataStream<Tuple2<String, Long>> advertisement = env
            .addSource(new FlinkKafkaConsumer082<String>("advertisement", new SimpleStringSchema(), properties))
            .map(new MapFunction<String, Tuple2<String, Long>>() {
                private static final long serialVersionUID = -6564495005753073342L;

                @Override
                public Tuple2<String, Long> map(String value) throws Exception {
                    String[] splits = value.split(" ");
                    return new Tuple2<String, Long>(splits[0], Long.parseLong(splits[1]));
                }
            }).keyBy(keySelector).assignTimestamps(timestampExtractor1);

    DataStream<Tuple2<String, Long>> click = env
            .addSource(new FlinkKafkaConsumer082<String>("click", new SimpleStringSchema(), properties))
            .map(new MapFunction<String, Tuple2<String, Long>>() {
                private static final long serialVersionUID = -6564495005753073342L;

                @Override
                public Tuple2<String, Long> map(String value) throws Exception {
                    String[] splits = value.split(" ");
                    return new Tuple2<String, Long>(splits[0], Long.parseLong(splits[1]));
                }
            }).keyBy(keySelector).assignTimestamps(timestampExtractor2);

    NoWindowJoinedStreams<Tuple2<String, Long>, Tuple2<String, Long>> joinedStreams =
            new NoWindowJoinedStreams<>(advertisement, click);
    DataStream<Tuple3<String, Long, Long>> joinedStream = joinedStreams
            .where(keySelector)
            .buffer(Time.of(20, TimeUnit.SECONDS))
            .equalTo(keySelector)
            .buffer(Time.of(5, TimeUnit.SECONDS))
            .apply(new JoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, Tuple3<String, Long, Long>>() {
                private static final long serialVersionUID = -5075871109025215769L;

                @Override
                public Tuple3<String, Long, Long> join(Tuple2<String, Long> first, Tuple2<String, Long> second) throws Exception {
                    return new Tuple3<>(first.f0, first.f1, second.f1);
                }
            });

我基于 Flink 流实现了两个新的连接运算符 API TwoInputTransformation。请检查 Flink-stream-join。我将向此存储库添加更多测试。

在寻找相同问题的解决方案时,我发现 "Interval Join" 非常有用,它不会重复输出相同的元素。这是 example from the Flink documentation:

DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...

orangeStream
    .keyBy(<KeySelector>)
    .intervalJoin(greenStream.keyBy(<KeySelector>))
    .between(Time.milliseconds(-2), Time.milliseconds(1))
    .process (new ProcessJoinFunction<Integer, Integer, String(){
        @Override
        public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
            out.collect(first + "," + second);
        }
    });

有了这个,就不必定义明确的 window,而是像这样用于每个单个元素的间隔(图片来自 Flink documentation):