测试 Flink window

Testing Flink window

我有一个简单的Flink应用,它总结了最后一分钟内具有相同id和时间戳的事件:

DataStream<String> input = env
                .addSource(consumerProps)
                .uid("app");

DataStream<Pixel> pixels = input.map(record -> mapper.readValue(record, Pixel.class));

pixels
        .keyBy("id", "timestampRoundedToMinutes")
        .timeWindow(Time.minutes(1))
        .sum("constant")
        .addSink(dynamoDBSink);

env.execute(jobName);

我正在尝试使用 documentation. I also have looked at this Whosebug question 中推荐的方法测试此应用程序,但添加接收器没有帮助。

我确实有一个@ClassRule,正如我在测试中推荐的那样class。该函数如下所示:

StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);

CollectSink.values.clear();

Pixel testPixel1 = Pixel.builder().id(1).timestampRoundedToMinutes("202002261219").constant(1).build();
Pixel testPixel2 = Pixel.builder().id(2).timestampRoundedToMinutes("202002261220").constant(1).build();
Pixel testPixel3 = Pixel.builder().id(1).timestampRoundedToMinutes("202002261219").constant(1).build();
Pixel testPixel4 = Pixel.builder().id(3).timestampRoundedToMinutes("202002261220").constant(1).build();

env.fromElements(testPixel1, testPixel2, testPixel3, testPixel4)
    .keyBy("id","timestampRoundedToMinutes")
    .timeWindow(Time.minutes(1))
    .sum("constant")
    .addSink(new CollectSink());

JobExecutionResult result = env.execute("AggregationTest");
assertNotEquals(0, CollectSink.values.size());

CollectSink 复制自 documentation

我做错了什么?还有一种简单的方法可以使用嵌入式kafka测试应用程序吗?

谢谢!

您的测试失败的原因是 window 从未被触发。在 window 可以到达其分配的时间结束之前完成作业 运行s。

这与您处理时间的方式有关。通过指定

.keyBy("id","timestampRoundedToMinutes")

您正在安排同一分钟内具有相同 ID 和时间戳的所有事件在同一 window 中。但是因为您使用的是处理时间 windowing(而不是事件时间 windowing),所以您的 windows 直到测试 运行 的时间才会关闭ning从一分钟到下一分钟越过边界。由于只有四个事件要处理,您的工作不太可能 运行 足够长的时间来实现这种情况。

您应该做的更像是这样:将时间特征设置为事件时间,并提供时间戳提取器和水印分配器。请注意,通过这样做,就不需要按四舍五入到分钟边界的时间戳来键入 - 这是事件时间 windows 无论如何都会做的事情的一部分。

public static void main(String[] args) throws Exception {
    ...

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    env.fromElements(testPixel1, testPixel2, testPixel3, testPixel4)
        .assignTimestampsAndWatermarks(new TimestampsAndWatermarks())
        .keyBy("id")
        .timeWindow(Time.minutes(1))
        .sum("constant")
        .addSink(new CollectSink());

    env.execute();
}

private static class TimestampsAndWatermarks extends BoundedOutOfOrdernessTimestampExtractor<Event> {
    public TimestampsAndWatermarks() {
        super(/* delay to handle out-of-orderness */);
    }

    @Override
    public long extractTimestamp(Event event) {
        return event.timestamp;
    }
}

有关事件时间、水印和 windowing 的更多信息,请参阅 documentation and the tutorials