Flink Autojoin 与行时间列

Flink Autojoin with rowtime column

我有一个具有以下结构的 Flink table:

Id1, Id2, myTimestamp, value

其中行时间基于 myTimestamp

我有以下处理效果很好:

Table processed = tableEnv.sqlQuery("SELECT " +
                "Id1, " +
                "MAX(myTimestamp) as myTimestamp, " +
                "SUM(value) as value " +
                "FROM MyTable " +
                "GROUP BY Id1, HOP(rowtime, INTERVAL 10 SECOND, INTERVAL 30 SECOND)");

我想调整以前的代码,例如每个 window,我只使用每个 Id2 的最新记录。所以我虽然按如下方式更改代码会起作用:

Table processed = tableEnv.sqlQuery("SELECT " +
                "Id1, " +
                "MAX(myTimestamp) as myTimestamp, " +
                "SUM(value) as value " +
                "FROM MyTable, " + 
                "(SELECT Id2, MAX(myTimestamp) as latestTimestamp FROM MyTable GROUP BY Id2) as RecordsLatest" +
                "WHERE  MyTable.Id2 = RecordsLatest.Id2 AND MyTable.myTimestamp = RecordsLatest.myTimestamp" +
                "GROUP BY Id1, HOP(rowtime, INTERVAL 10 SECOND, INTERVAL 30 SECOND)");

但是当我这样做时,出现以下错误:

Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.
Please check the documentation for the set of currently supported SQL features.
    at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:387)
    at org.apache.flink.table.api.TableEnvironment.optimizePhysicalPlan(TableEnvironment.scala:302)
    at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:816)
    at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:351)
    at org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:879)
    at org.apache.flink.table.api.Table.insertInto(table.scala:1126)

看来Flink并没有'understand'我加入的两个table是同一个。

我怎样才能做我想做的事?

您的查询不起作用的原因很少。

SELECT 
  Id1, MAX(myTimestamp) as myTimestamp, SUM(value) as value 
FROM 
  MyTable, 
  (SELECT Id2, MAX(myTimestamp) as latestTimestamp 
   FROM MyTable 
   GROUP BY Id2
  ) as RecordsLatest
WHERE 
  MyTable.Id2 = RecordsLatest.Id2 
  AND MyTable.myTimestamp = RecordsLatest.myTimestamp
GROUP BY 
  Id1, HOP(rowtime, INTERVAL 10 SECOND, INTERVAL 30 SECOND)

有些是由于 Flink 的限制,有些是更基本的。

  1. latestTimestamp 不再是行时间属性。这是因为,它是经过计算的。一旦在表达式中使用行时间属性(包括像 MAX 这样的聚合函数),它们就会失去行时间 属性 并成为常规的 TIMESTAMP 属性。
  2. 内部查询生成动态 table 来更新其结果。它不是仅附加 table。一旦 Id2 的最大时间戳发生变化,就需要撤回先前的结果行并插入新的结果行。
  3. 由于 RecordsLatest 是更新 table(而不是仅附加 table)并且 latestTimestamp 不是行时间属性,[=17 的连接=] 和 MyTable 是一个 "regular join"(而不是时间 windowed 连接),它也产生更新结果而不是仅附加结果。常规连接不能产生任何行时间属性,因为无法保证输出行的顺序(这是行时间属性的先决条件,因为它们需要与水印对齐)并且结果可能需要在将来删除它们。这导致了您看到的错误消息。
  4. 外部查询的 GROUP BY 子句需要具有行时间属性 rowtime 的仅追加输入 table。但是,连接的输出不是仅附加的,而是更新的,并且 rowtime 属性不能是行时间属性,如前所述。

不幸的是,解决您的任务并不简单,但应该是可能的。

首先,您应该 return 有一个查询,其中 return 每个 (Id1, Id2) window 具有最大时间戳的行的值:

SELECT 
  Id1, Id2,
  MAX(myTimestamp) AS maxT
  ValOfMaxT(valX, myTimestamp) AS valXOfMaxT,
  HOP_ROWTIME(myTimestamp, INTERVAL '10' SECOND, INTERVAL '30' SECOND) AS rowtime
FROM
  MyTable
GROUP BY
  Id1, Id2, HOP(myTimestamp, INTERVAL '10' SECOND, INTERVAL '30' SECOND)

ValOfMaxT 函数是一个用户定义的聚合函数,它标识最大时间戳的值并 return 对其进行标识。 rowtime 是新的行时间属性,在 window 的结束时间戳前 1 毫秒。

鉴于此 table,我们称它为 Temp 您可以将下一个查询定义为:


SELECT
  Id1, MAX(maxT) as myTimestamp, SUM(valXOfMaxT)
FROM Temp
GROUP BY
  Id1, TUMBLE(rowtime, INTERVAL '10' SECONDS)

此查询仅对 Id1TUMBLE window 进行分组。这是一个 TUMBLE window 因为第一个 HOP window 已经将每条记录分组为三个 windows 我们不应该再这样做了。相反,我们将第一个查询的结果分组为 10 秒 windows,因为这是第一个查询中 HOP windows 的幻灯片长度。