使用 Flink 优化 Top-N 查询 SQL

Optimized Top-N query using Flink SQL

我正在尝试 运行 使用 Flink SQL 的流式 top-n 查询,但无法使 "optimized version" outlined in the Flink docs 正常工作。设置如下:

我有一个 Kafka 主题,其中每条记录都包含一个元组(GUID、达到的分数、最大可能分数)。把它们想象成一个学生参加评估,元组代表他获得了多少分。

我想要得到的是五个 GUID 的列表,其中得分最高的百分比(即按 SUM(reached_score) / SUM(最大可能得分)排序)。

我首先汇总分数并按 GUID 对它们进行分组:

EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, bsSettings);

Table scores = tableEnv.fromDataStream(/* stream from kafka */, "guid, reached_score, max_score");
tableEnv.registerTable("scores", scores);

Table aggregatedScores = tableEnv.sqlQuery(
        "SELECT " +
        "  guid, " +
        "  SUM(reached_score) as reached_score, " +
        "  SUM(max_score) as max_score, " +
        "  SUM(reached_score) / CAST(SUM(max_score) AS DOUBLE) as score " +
        "FROM scores " +
        "GROUP BY guid");

tableEnv.registerTable("agg_scores", aggregatedScores);

生成的 table 包含未排序的汇总分数列表。然后我尝试将其输入 Flink 文档中使用的 Top-N 查询:

Table topN = tableEnv.sqlQuery(
        "SELECT guid, reached_score, max_score, score, row_num " +
        "FROM (" +
        "   SELECT *," +
        "       ROW_NUMBER() OVER (ORDER BY score DESC) as row_num" +
        "   FROM agg_scores)" +
        "WHERE row_num <= 5");


tableEnv.toRetractStream(topN, Row.class).print();

运行 此查询 运行 与预期大致相同,如果元素的顺序发生变化,则会导致多次更新。

// add first entry
6> (true,63992935-9684-4285-8c2b-1fd57b51b48f,97,200,0.485,1)

// add a second entry with lower score below the first one
7> (true,d7847f58-a4d9-40f8-a38d-161821b48481,67,200,0.335,2)

// update the second entry with a much higher score
8> (false,d7847f58-a4d9-40f8-a38d-161821b48481,67,200,0.335,2)
1> (true,d7847f58-a4d9-40f8-a38d-161821b48481,229,400,0.5725,1)
3> (true,63992935-9684-4285-8c2b-1fd57b51b48f,97,200,0.485,2)
2> (false,63992935-9684-4285-8c2b-1fd57b51b48f,97,200,0.485,1)

然后我按照文档的建议从投影中删除了 row_number:

Table topN = tableEnv.sqlQuery(
    "SELECT guid, reached_score, max_score, score " +
    "FROM (" +
    "   SELECT *," +
    "       ROW_NUMBER() OVER (ORDER BY score DESC) as row_num" +
    "   FROM agg_scores)" +
    "WHERE row_num <= 5");

运行 一个类似的数据集:

// add first entry
4> (true,63992935-9684-4285-8c2b-1fd57b51b48f,112,200,0.56)

// add a second entry with lower score below the first one
5> (true,d7847f58-a4d9-40f8-a38d-161821b48481,76,200,0.38)

// update the second entry with a much higher score
7> (true,d7847f58-a4d9-40f8-a38d-161821b48481,354,400,0.885)
1> (true,63992935-9684-4285-8c2b-1fd57b51b48f,112,200,0.56) <-- ???
8> (false,63992935-9684-4285-8c2b-1fd57b51b48f,112,200,0.56) <-- ???
6> (false,d7847f58-a4d9-40f8-a38d-161821b48481,76,200,0.38)

我不明白的是:

两者显然都与排序变化的顺序有关,但这不是优化的top-n查询(written further down in the documentation)应该解决的问题吗?

我检查过这个问题,也可以在我的本地环境中重现。我也做了一些调查,原因是:

"we didn't do such optimization for some scenarios, and your case seems to be one of them"。

但是,根据用户文档,我认为在您的方案中也包含此类优化是有效的请求。对我来说这看起来像是一个 BUG,我们声称进行了一些优化但没有成功。

我创建了一个问题:https://issues.apache.org/jira/browse/FLINK-15497 来跟踪这个问题,希望我们能在即将到来的 1.9.2 和 1.10.0 版本中修复它。

感谢您报告此事。