测试 Flink window
Testing Flink window
我有一个简单的Flink应用,它总结了最后一分钟内具有相同id和时间戳的事件:
DataStream<String> input = env
.addSource(consumerProps)
.uid("app");
DataStream<Pixel> pixels = input.map(record -> mapper.readValue(record, Pixel.class));
pixels
.keyBy("id", "timestampRoundedToMinutes")
.timeWindow(Time.minutes(1))
.sum("constant")
.addSink(dynamoDBSink);
env.execute(jobName);
我正在尝试使用 documentation. I also have looked at this Whosebug question 中推荐的方法测试此应用程序,但添加接收器没有帮助。
我确实有一个@ClassRule,正如我在测试中推荐的那样class。该函数如下所示:
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
CollectSink.values.clear();
Pixel testPixel1 = Pixel.builder().id(1).timestampRoundedToMinutes("202002261219").constant(1).build();
Pixel testPixel2 = Pixel.builder().id(2).timestampRoundedToMinutes("202002261220").constant(1).build();
Pixel testPixel3 = Pixel.builder().id(1).timestampRoundedToMinutes("202002261219").constant(1).build();
Pixel testPixel4 = Pixel.builder().id(3).timestampRoundedToMinutes("202002261220").constant(1).build();
env.fromElements(testPixel1, testPixel2, testPixel3, testPixel4)
.keyBy("id","timestampRoundedToMinutes")
.timeWindow(Time.minutes(1))
.sum("constant")
.addSink(new CollectSink());
JobExecutionResult result = env.execute("AggregationTest");
assertNotEquals(0, CollectSink.values.size());
CollectSink 复制自 documentation。
我做错了什么?还有一种简单的方法可以使用嵌入式kafka测试应用程序吗?
谢谢!
您的测试失败的原因是 window 从未被触发。在 window 可以到达其分配的时间结束之前完成作业 运行s。
这与您处理时间的方式有关。通过指定
.keyBy("id","timestampRoundedToMinutes")
您正在安排同一分钟内具有相同 ID 和时间戳的所有事件在同一 window 中。但是因为您使用的是处理时间 windowing(而不是事件时间 windowing),所以您的 windows 直到测试 运行 的时间才会关闭ning从一分钟到下一分钟越过边界。由于只有四个事件要处理,您的工作不太可能 运行 足够长的时间来实现这种情况。
您应该做的更像是这样:将时间特征设置为事件时间,并提供时间戳提取器和水印分配器。请注意,通过这样做,就不需要按四舍五入到分钟边界的时间戳来键入 - 这是事件时间 windows 无论如何都会做的事情的一部分。
public static void main(String[] args) throws Exception {
...
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.fromElements(testPixel1, testPixel2, testPixel3, testPixel4)
.assignTimestampsAndWatermarks(new TimestampsAndWatermarks())
.keyBy("id")
.timeWindow(Time.minutes(1))
.sum("constant")
.addSink(new CollectSink());
env.execute();
}
private static class TimestampsAndWatermarks extends BoundedOutOfOrdernessTimestampExtractor<Event> {
public TimestampsAndWatermarks() {
super(/* delay to handle out-of-orderness */);
}
@Override
public long extractTimestamp(Event event) {
return event.timestamp;
}
}
有关事件时间、水印和 windowing 的更多信息,请参阅 documentation and the tutorials。
我有一个简单的Flink应用,它总结了最后一分钟内具有相同id和时间戳的事件:
DataStream<String> input = env
.addSource(consumerProps)
.uid("app");
DataStream<Pixel> pixels = input.map(record -> mapper.readValue(record, Pixel.class));
pixels
.keyBy("id", "timestampRoundedToMinutes")
.timeWindow(Time.minutes(1))
.sum("constant")
.addSink(dynamoDBSink);
env.execute(jobName);
我正在尝试使用 documentation. I also have looked at this Whosebug question 中推荐的方法测试此应用程序,但添加接收器没有帮助。
我确实有一个@ClassRule,正如我在测试中推荐的那样class。该函数如下所示:
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
CollectSink.values.clear();
Pixel testPixel1 = Pixel.builder().id(1).timestampRoundedToMinutes("202002261219").constant(1).build();
Pixel testPixel2 = Pixel.builder().id(2).timestampRoundedToMinutes("202002261220").constant(1).build();
Pixel testPixel3 = Pixel.builder().id(1).timestampRoundedToMinutes("202002261219").constant(1).build();
Pixel testPixel4 = Pixel.builder().id(3).timestampRoundedToMinutes("202002261220").constant(1).build();
env.fromElements(testPixel1, testPixel2, testPixel3, testPixel4)
.keyBy("id","timestampRoundedToMinutes")
.timeWindow(Time.minutes(1))
.sum("constant")
.addSink(new CollectSink());
JobExecutionResult result = env.execute("AggregationTest");
assertNotEquals(0, CollectSink.values.size());
CollectSink 复制自 documentation。
我做错了什么?还有一种简单的方法可以使用嵌入式kafka测试应用程序吗?
谢谢!
您的测试失败的原因是 window 从未被触发。在 window 可以到达其分配的时间结束之前完成作业 运行s。
这与您处理时间的方式有关。通过指定
.keyBy("id","timestampRoundedToMinutes")
您正在安排同一分钟内具有相同 ID 和时间戳的所有事件在同一 window 中。但是因为您使用的是处理时间 windowing(而不是事件时间 windowing),所以您的 windows 直到测试 运行 的时间才会关闭ning从一分钟到下一分钟越过边界。由于只有四个事件要处理,您的工作不太可能 运行 足够长的时间来实现这种情况。
您应该做的更像是这样:将时间特征设置为事件时间,并提供时间戳提取器和水印分配器。请注意,通过这样做,就不需要按四舍五入到分钟边界的时间戳来键入 - 这是事件时间 windows 无论如何都会做的事情的一部分。
public static void main(String[] args) throws Exception {
...
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.fromElements(testPixel1, testPixel2, testPixel3, testPixel4)
.assignTimestampsAndWatermarks(new TimestampsAndWatermarks())
.keyBy("id")
.timeWindow(Time.minutes(1))
.sum("constant")
.addSink(new CollectSink());
env.execute();
}
private static class TimestampsAndWatermarks extends BoundedOutOfOrdernessTimestampExtractor<Event> {
public TimestampsAndWatermarks() {
super(/* delay to handle out-of-orderness */);
}
@Override
public long extractTimestamp(Event event) {
return event.timestamp;
}
}
有关事件时间、水印和 windowing 的更多信息,请参阅 documentation and the tutorials。