Flink slide window join 中如何避免元组重复?
How to avoid repeated tuples in Flink slide window join?
例如,有两个流。一种是向用户展示的广告。其中的元组可以描述为 (advertiseId, showed timestamp)。另一个是点击流——(advertiseId,点击时间戳)。我们想要获得一个加入流,其中包括在显示后 20 分钟内用户点击的所有广告。我的解决方案是在 SlidingTimeWindow 上加入这两个流。但是在加入的流中,有很多重复的元组。我怎样才能在新流中加入一个元组?
stream1.join(stream2)
.where(0)
.equalTo(0)
.window(SlidingTimeWindows.of(Time.of(30, TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS)))
在您的代码中,您定义了重叠滑动 window(滑动小于 window 大小)。如果您不想重复,您可以通过仅指定 window 大小(默认幻灯片等于 window 大小)来定义非重叠 window。
解决方案 1:
让 flink 支持像 Spark 流一样在单独的 windows 上加入两个流。在这种情况下,在广告流上实现 SlidingTimeWindows(21 分钟,1 分钟),在点击流上实现 TupblingTimeWindows(1 分钟),然后加入这两个 windowed 流。
TupblingTimeWindows 可以避免加入流中的重复记录。
21 分钟大小的 SlidingTimeWindows 可以避免丢失合法点击。
一个问题是加入的流中会有一些非法点击(20 分钟后点击)。通过添加过滤器可以轻松解决此问题。
MultiWindowsJoinedStreams<Tuple2<String, Long>, Tuple2<String, Long>> joinedStreams =
new MultiWindowsJoinedStreams<>(advertisement, click);
DataStream<Tuple3<String, Long, Long>> joinedStream = joinedStreams.where(keySelector)
.window(SlidingTimeWindows.of(Time.of(21, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS)))
.equalTo(keySelector)
.window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.apply(new JoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, Tuple3<String, Long, Long>>() {
private static final long serialVersionUID = -3625150954096822268L;
@Override
public Tuple3<String, Long, Long> join(Tuple2<String, Long> first, Tuple2<String, Long> second) throws Exception {
return new Tuple3<>(first.f0, first.f1, second.f1);
}
});
joinedStream = joinedStream.filter(new FilterFunction<Tuple3<String, Long, Long>>() {
private static final long serialVersionUID = -4325256210808325338L;
@Override
public boolean filter(Tuple3<String, Long, Long> value) throws Exception {
return value.f1<value.f2&&value.f1+20000>=value.f2;
}
});
解决方案 2:
Flink 支持没有 window 的连接操作。连接运算符实现接口 TwoInputStreamOperator 保留这两个流的两个缓冲区(基于时间长度)并输出一个连接流。
DataStream<Tuple2<String, Long>> advertisement = env
.addSource(new FlinkKafkaConsumer082<String>("advertisement", new SimpleStringSchema(), properties))
.map(new MapFunction<String, Tuple2<String, Long>>() {
private static final long serialVersionUID = -6564495005753073342L;
@Override
public Tuple2<String, Long> map(String value) throws Exception {
String[] splits = value.split(" ");
return new Tuple2<String, Long>(splits[0], Long.parseLong(splits[1]));
}
}).keyBy(keySelector).assignTimestamps(timestampExtractor1);
DataStream<Tuple2<String, Long>> click = env
.addSource(new FlinkKafkaConsumer082<String>("click", new SimpleStringSchema(), properties))
.map(new MapFunction<String, Tuple2<String, Long>>() {
private static final long serialVersionUID = -6564495005753073342L;
@Override
public Tuple2<String, Long> map(String value) throws Exception {
String[] splits = value.split(" ");
return new Tuple2<String, Long>(splits[0], Long.parseLong(splits[1]));
}
}).keyBy(keySelector).assignTimestamps(timestampExtractor2);
NoWindowJoinedStreams<Tuple2<String, Long>, Tuple2<String, Long>> joinedStreams =
new NoWindowJoinedStreams<>(advertisement, click);
DataStream<Tuple3<String, Long, Long>> joinedStream = joinedStreams
.where(keySelector)
.buffer(Time.of(20, TimeUnit.SECONDS))
.equalTo(keySelector)
.buffer(Time.of(5, TimeUnit.SECONDS))
.apply(new JoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, Tuple3<String, Long, Long>>() {
private static final long serialVersionUID = -5075871109025215769L;
@Override
public Tuple3<String, Long, Long> join(Tuple2<String, Long> first, Tuple2<String, Long> second) throws Exception {
return new Tuple3<>(first.f0, first.f1, second.f1);
}
});
我基于 Flink 流实现了两个新的连接运算符 API TwoInputTransformation。请检查 Flink-stream-join。我将向此存储库添加更多测试。
在寻找相同问题的解决方案时,我发现 "Interval Join" 非常有用,它不会重复输出相同的元素。这是 example from the Flink documentation:
DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...
orangeStream
.keyBy(<KeySelector>)
.intervalJoin(greenStream.keyBy(<KeySelector>))
.between(Time.milliseconds(-2), Time.milliseconds(1))
.process (new ProcessJoinFunction<Integer, Integer, String(){
@Override
public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
out.collect(first + "," + second);
}
});
有了这个,就不必定义明确的 window,而是像这样用于每个单个元素的间隔(图片来自 Flink documentation):
例如,有两个流。一种是向用户展示的广告。其中的元组可以描述为 (advertiseId, showed timestamp)。另一个是点击流——(advertiseId,点击时间戳)。我们想要获得一个加入流,其中包括在显示后 20 分钟内用户点击的所有广告。我的解决方案是在 SlidingTimeWindow 上加入这两个流。但是在加入的流中,有很多重复的元组。我怎样才能在新流中加入一个元组?
stream1.join(stream2)
.where(0)
.equalTo(0)
.window(SlidingTimeWindows.of(Time.of(30, TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS)))
在您的代码中,您定义了重叠滑动 window(滑动小于 window 大小)。如果您不想重复,您可以通过仅指定 window 大小(默认幻灯片等于 window 大小)来定义非重叠 window。
解决方案 1:
让 flink 支持像 Spark 流一样在单独的 windows 上加入两个流。在这种情况下,在广告流上实现 SlidingTimeWindows(21 分钟,1 分钟),在点击流上实现 TupblingTimeWindows(1 分钟),然后加入这两个 windowed 流。
TupblingTimeWindows 可以避免加入流中的重复记录。 21 分钟大小的 SlidingTimeWindows 可以避免丢失合法点击。 一个问题是加入的流中会有一些非法点击(20 分钟后点击)。通过添加过滤器可以轻松解决此问题。
MultiWindowsJoinedStreams<Tuple2<String, Long>, Tuple2<String, Long>> joinedStreams =
new MultiWindowsJoinedStreams<>(advertisement, click);
DataStream<Tuple3<String, Long, Long>> joinedStream = joinedStreams.where(keySelector)
.window(SlidingTimeWindows.of(Time.of(21, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS)))
.equalTo(keySelector)
.window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.apply(new JoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, Tuple3<String, Long, Long>>() {
private static final long serialVersionUID = -3625150954096822268L;
@Override
public Tuple3<String, Long, Long> join(Tuple2<String, Long> first, Tuple2<String, Long> second) throws Exception {
return new Tuple3<>(first.f0, first.f1, second.f1);
}
});
joinedStream = joinedStream.filter(new FilterFunction<Tuple3<String, Long, Long>>() {
private static final long serialVersionUID = -4325256210808325338L;
@Override
public boolean filter(Tuple3<String, Long, Long> value) throws Exception {
return value.f1<value.f2&&value.f1+20000>=value.f2;
}
});
解决方案 2:
Flink 支持没有 window 的连接操作。连接运算符实现接口 TwoInputStreamOperator 保留这两个流的两个缓冲区(基于时间长度)并输出一个连接流。
DataStream<Tuple2<String, Long>> advertisement = env
.addSource(new FlinkKafkaConsumer082<String>("advertisement", new SimpleStringSchema(), properties))
.map(new MapFunction<String, Tuple2<String, Long>>() {
private static final long serialVersionUID = -6564495005753073342L;
@Override
public Tuple2<String, Long> map(String value) throws Exception {
String[] splits = value.split(" ");
return new Tuple2<String, Long>(splits[0], Long.parseLong(splits[1]));
}
}).keyBy(keySelector).assignTimestamps(timestampExtractor1);
DataStream<Tuple2<String, Long>> click = env
.addSource(new FlinkKafkaConsumer082<String>("click", new SimpleStringSchema(), properties))
.map(new MapFunction<String, Tuple2<String, Long>>() {
private static final long serialVersionUID = -6564495005753073342L;
@Override
public Tuple2<String, Long> map(String value) throws Exception {
String[] splits = value.split(" ");
return new Tuple2<String, Long>(splits[0], Long.parseLong(splits[1]));
}
}).keyBy(keySelector).assignTimestamps(timestampExtractor2);
NoWindowJoinedStreams<Tuple2<String, Long>, Tuple2<String, Long>> joinedStreams =
new NoWindowJoinedStreams<>(advertisement, click);
DataStream<Tuple3<String, Long, Long>> joinedStream = joinedStreams
.where(keySelector)
.buffer(Time.of(20, TimeUnit.SECONDS))
.equalTo(keySelector)
.buffer(Time.of(5, TimeUnit.SECONDS))
.apply(new JoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, Tuple3<String, Long, Long>>() {
private static final long serialVersionUID = -5075871109025215769L;
@Override
public Tuple3<String, Long, Long> join(Tuple2<String, Long> first, Tuple2<String, Long> second) throws Exception {
return new Tuple3<>(first.f0, first.f1, second.f1);
}
});
我基于 Flink 流实现了两个新的连接运算符 API TwoInputTransformation。请检查 Flink-stream-join。我将向此存储库添加更多测试。
在寻找相同问题的解决方案时,我发现 "Interval Join" 非常有用,它不会重复输出相同的元素。这是 example from the Flink documentation:
DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...
orangeStream
.keyBy(<KeySelector>)
.intervalJoin(greenStream.keyBy(<KeySelector>))
.between(Time.milliseconds(-2), Time.milliseconds(1))
.process (new ProcessJoinFunction<Integer, Integer, String(){
@Override
public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
out.collect(first + "," + second);
}
});
有了这个,就不必定义明确的 window,而是像这样用于每个单个元素的间隔(图片来自 Flink documentation):