Apache Flink:未触发 Stream Join Window
Apache Flink: Stream Join Window is not triggered
我正在尝试在 apache flink 中加入两个流以获得一些结果。
我项目的当前状态是,我正在获取推特数据并将其映射到二元组中,其中保存了用户的语言和定义时间 window 内的推文总数.
我对每种语言的推文数量和每种语言的转推数量都做了这些。 tweet/retweet 聚合在其他进程中工作正常。
我现在想获取一次转推数占所有推文数的百分比window。
因此我使用以下代码:
Time windowSize = Time.seconds(15);
// Sum up tweets per language
DataStream<Tuple2<String, Integer>> tweetsLangSum = tweets
.flatMap(new TweetLangFlatMap())
.keyBy(0)
.timeWindow(windowSize)
.sum(1);
// ---
// Get retweets out of all tweets per language
DataStream<Tuple2<String, Integer>> retweetsLangMap = tweets
.keyBy(new KeyByTweetPostId())
.flatMap(new RetweetLangFlatMap());
// Sum up retweets per language
DataStream<Tuple2<String, Integer>> retweetsLangSum = retweetsLangMap
.keyBy(0)
.timeWindow(windowSize)
.sum(1);
// ---
tweetsLangSum.join(retweetsLangSum)
.where(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> tweet) throws Exception {
return tweet.f0;
}
})
.equalTo(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> tweet) throws Exception {
return tweet.f0;
}
})
.window(TumblingEventTimeWindows.of(windowSize))
.apply(new JoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple4<String, Integer, Integer, Double>>() {
@Override
public Tuple4<String, Integer, Integer, Double> join(Tuple2<String, Integer> in1, Tuple2<String, Integer> in2) throws Exception {
String lang = in1.f0;
Double percentage = (double) in1.f1 / in2.f1;
return new Tuple4<>(in1.f0, in1.f1, in2.f1, percentage);
}
})
.print();
当我打印 tweetsLangSum
或 retweetsLangSum
时,输出似乎没问题。我的问题是我从来没有从连接中得到输出。有谁知道为什么?还是我在聚合的第一步中使用 window 函数在连接时出错?
这可能是由于混合了不同的时间语义造成的。 KeyedStream.timeWindow()
方法是一种快捷方式,它根据配置的时间特征创建一个 window 运算符,即如果事件时间启用,则事件时间 window 或处理时间 window 否则。对于连接,您显式定义事件时间 window.
您是否启用了事件时间处理?
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
我正在尝试在 apache flink 中加入两个流以获得一些结果。
我项目的当前状态是,我正在获取推特数据并将其映射到二元组中,其中保存了用户的语言和定义时间 window 内的推文总数. 我对每种语言的推文数量和每种语言的转推数量都做了这些。 tweet/retweet 聚合在其他进程中工作正常。
我现在想获取一次转推数占所有推文数的百分比window。
因此我使用以下代码:
Time windowSize = Time.seconds(15);
// Sum up tweets per language
DataStream<Tuple2<String, Integer>> tweetsLangSum = tweets
.flatMap(new TweetLangFlatMap())
.keyBy(0)
.timeWindow(windowSize)
.sum(1);
// ---
// Get retweets out of all tweets per language
DataStream<Tuple2<String, Integer>> retweetsLangMap = tweets
.keyBy(new KeyByTweetPostId())
.flatMap(new RetweetLangFlatMap());
// Sum up retweets per language
DataStream<Tuple2<String, Integer>> retweetsLangSum = retweetsLangMap
.keyBy(0)
.timeWindow(windowSize)
.sum(1);
// ---
tweetsLangSum.join(retweetsLangSum)
.where(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> tweet) throws Exception {
return tweet.f0;
}
})
.equalTo(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> tweet) throws Exception {
return tweet.f0;
}
})
.window(TumblingEventTimeWindows.of(windowSize))
.apply(new JoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple4<String, Integer, Integer, Double>>() {
@Override
public Tuple4<String, Integer, Integer, Double> join(Tuple2<String, Integer> in1, Tuple2<String, Integer> in2) throws Exception {
String lang = in1.f0;
Double percentage = (double) in1.f1 / in2.f1;
return new Tuple4<>(in1.f0, in1.f1, in2.f1, percentage);
}
})
.print();
当我打印 tweetsLangSum
或 retweetsLangSum
时,输出似乎没问题。我的问题是我从来没有从连接中得到输出。有谁知道为什么?还是我在聚合的第一步中使用 window 函数在连接时出错?
这可能是由于混合了不同的时间语义造成的。 KeyedStream.timeWindow()
方法是一种快捷方式,它根据配置的时间特征创建一个 window 运算符,即如果事件时间启用,则事件时间 window 或处理时间 window 否则。对于连接,您显式定义事件时间 window.
您是否启用了事件时间处理?
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);