Flink Autojoin 与行时间列
Flink Autojoin with rowtime column
我有一个具有以下结构的 Flink table:
Id1, Id2, myTimestamp, value
其中行时间基于 myTimestamp
。
我有以下处理效果很好:
Table processed = tableEnv.sqlQuery("SELECT " +
"Id1, " +
"MAX(myTimestamp) as myTimestamp, " +
"SUM(value) as value " +
"FROM MyTable " +
"GROUP BY Id1, HOP(rowtime, INTERVAL 10 SECOND, INTERVAL 30 SECOND)");
我想调整以前的代码,例如每个 window,我只使用每个 Id2
的最新记录。所以我虽然按如下方式更改代码会起作用:
Table processed = tableEnv.sqlQuery("SELECT " +
"Id1, " +
"MAX(myTimestamp) as myTimestamp, " +
"SUM(value) as value " +
"FROM MyTable, " +
"(SELECT Id2, MAX(myTimestamp) as latestTimestamp FROM MyTable GROUP BY Id2) as RecordsLatest" +
"WHERE MyTable.Id2 = RecordsLatest.Id2 AND MyTable.myTimestamp = RecordsLatest.myTimestamp" +
"GROUP BY Id1, HOP(rowtime, INTERVAL 10 SECOND, INTERVAL 30 SECOND)");
但是当我这样做时,出现以下错误:
Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.
Please check the documentation for the set of currently supported SQL features.
at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:387)
at org.apache.flink.table.api.TableEnvironment.optimizePhysicalPlan(TableEnvironment.scala:302)
at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:816)
at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:351)
at org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:879)
at org.apache.flink.table.api.Table.insertInto(table.scala:1126)
看来Flink并没有'understand'我加入的两个table是同一个。
我怎样才能做我想做的事?
您的查询不起作用的原因很少。
SELECT
Id1, MAX(myTimestamp) as myTimestamp, SUM(value) as value
FROM
MyTable,
(SELECT Id2, MAX(myTimestamp) as latestTimestamp
FROM MyTable
GROUP BY Id2
) as RecordsLatest
WHERE
MyTable.Id2 = RecordsLatest.Id2
AND MyTable.myTimestamp = RecordsLatest.myTimestamp
GROUP BY
Id1, HOP(rowtime, INTERVAL 10 SECOND, INTERVAL 30 SECOND)
有些是由于 Flink 的限制,有些是更基本的。
latestTimestamp
不再是行时间属性。这是因为,它是经过计算的。一旦在表达式中使用行时间属性(包括像 MAX
这样的聚合函数),它们就会失去行时间 属性 并成为常规的 TIMESTAMP
属性。
- 内部查询生成动态 table 来更新其结果。它不是仅附加 table。一旦
Id2
的最大时间戳发生变化,就需要撤回先前的结果行并插入新的结果行。
- 由于
RecordsLatest
是更新 table(而不是仅附加 table)并且 latestTimestamp
不是行时间属性,[=17 的连接=] 和 MyTable
是一个 "regular join"(而不是时间 windowed 连接),它也产生更新结果而不是仅附加结果。常规连接不能产生任何行时间属性,因为无法保证输出行的顺序(这是行时间属性的先决条件,因为它们需要与水印对齐)并且结果可能需要在将来删除它们。这导致了您看到的错误消息。
- 外部查询的
GROUP BY
子句需要具有行时间属性 rowtime
的仅追加输入 table。但是,连接的输出不是仅附加的,而是更新的,并且 rowtime
属性不能是行时间属性,如前所述。
不幸的是,解决您的任务并不简单,但应该是可能的。
首先,您应该 return 有一个查询,其中 return 每个 (Id1, Id2
) window 具有最大时间戳的行的值:
SELECT
Id1, Id2,
MAX(myTimestamp) AS maxT
ValOfMaxT(valX, myTimestamp) AS valXOfMaxT,
HOP_ROWTIME(myTimestamp, INTERVAL '10' SECOND, INTERVAL '30' SECOND) AS rowtime
FROM
MyTable
GROUP BY
Id1, Id2, HOP(myTimestamp, INTERVAL '10' SECOND, INTERVAL '30' SECOND)
ValOfMaxT
函数是一个用户定义的聚合函数,它标识最大时间戳的值并 return 对其进行标识。 rowtime
是新的行时间属性,在 window 的结束时间戳前 1 毫秒。
鉴于此 table,我们称它为 Temp
您可以将下一个查询定义为:
SELECT
Id1, MAX(maxT) as myTimestamp, SUM(valXOfMaxT)
FROM Temp
GROUP BY
Id1, TUMBLE(rowtime, INTERVAL '10' SECONDS)
此查询仅对 Id1
和 TUMBLE
window 进行分组。这是一个 TUMBLE
window 因为第一个 HOP
window 已经将每条记录分组为三个 windows 我们不应该再这样做了。相反,我们将第一个查询的结果分组为 10 秒 windows,因为这是第一个查询中 HOP
windows 的幻灯片长度。
我有一个具有以下结构的 Flink table:
Id1, Id2, myTimestamp, value
其中行时间基于 myTimestamp
。
我有以下处理效果很好:
Table processed = tableEnv.sqlQuery("SELECT " +
"Id1, " +
"MAX(myTimestamp) as myTimestamp, " +
"SUM(value) as value " +
"FROM MyTable " +
"GROUP BY Id1, HOP(rowtime, INTERVAL 10 SECOND, INTERVAL 30 SECOND)");
我想调整以前的代码,例如每个 window,我只使用每个 Id2
的最新记录。所以我虽然按如下方式更改代码会起作用:
Table processed = tableEnv.sqlQuery("SELECT " +
"Id1, " +
"MAX(myTimestamp) as myTimestamp, " +
"SUM(value) as value " +
"FROM MyTable, " +
"(SELECT Id2, MAX(myTimestamp) as latestTimestamp FROM MyTable GROUP BY Id2) as RecordsLatest" +
"WHERE MyTable.Id2 = RecordsLatest.Id2 AND MyTable.myTimestamp = RecordsLatest.myTimestamp" +
"GROUP BY Id1, HOP(rowtime, INTERVAL 10 SECOND, INTERVAL 30 SECOND)");
但是当我这样做时,出现以下错误:
Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.
Please check the documentation for the set of currently supported SQL features.
at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:387)
at org.apache.flink.table.api.TableEnvironment.optimizePhysicalPlan(TableEnvironment.scala:302)
at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:816)
at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:351)
at org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:879)
at org.apache.flink.table.api.Table.insertInto(table.scala:1126)
看来Flink并没有'understand'我加入的两个table是同一个。
我怎样才能做我想做的事?
您的查询不起作用的原因很少。
SELECT
Id1, MAX(myTimestamp) as myTimestamp, SUM(value) as value
FROM
MyTable,
(SELECT Id2, MAX(myTimestamp) as latestTimestamp
FROM MyTable
GROUP BY Id2
) as RecordsLatest
WHERE
MyTable.Id2 = RecordsLatest.Id2
AND MyTable.myTimestamp = RecordsLatest.myTimestamp
GROUP BY
Id1, HOP(rowtime, INTERVAL 10 SECOND, INTERVAL 30 SECOND)
有些是由于 Flink 的限制,有些是更基本的。
latestTimestamp
不再是行时间属性。这是因为,它是经过计算的。一旦在表达式中使用行时间属性(包括像MAX
这样的聚合函数),它们就会失去行时间 属性 并成为常规的TIMESTAMP
属性。- 内部查询生成动态 table 来更新其结果。它不是仅附加 table。一旦
Id2
的最大时间戳发生变化,就需要撤回先前的结果行并插入新的结果行。 - 由于
RecordsLatest
是更新 table(而不是仅附加 table)并且latestTimestamp
不是行时间属性,[=17 的连接=] 和MyTable
是一个 "regular join"(而不是时间 windowed 连接),它也产生更新结果而不是仅附加结果。常规连接不能产生任何行时间属性,因为无法保证输出行的顺序(这是行时间属性的先决条件,因为它们需要与水印对齐)并且结果可能需要在将来删除它们。这导致了您看到的错误消息。 - 外部查询的
GROUP BY
子句需要具有行时间属性rowtime
的仅追加输入 table。但是,连接的输出不是仅附加的,而是更新的,并且rowtime
属性不能是行时间属性,如前所述。
不幸的是,解决您的任务并不简单,但应该是可能的。
首先,您应该 return 有一个查询,其中 return 每个 (Id1, Id2
) window 具有最大时间戳的行的值:
SELECT
Id1, Id2,
MAX(myTimestamp) AS maxT
ValOfMaxT(valX, myTimestamp) AS valXOfMaxT,
HOP_ROWTIME(myTimestamp, INTERVAL '10' SECOND, INTERVAL '30' SECOND) AS rowtime
FROM
MyTable
GROUP BY
Id1, Id2, HOP(myTimestamp, INTERVAL '10' SECOND, INTERVAL '30' SECOND)
ValOfMaxT
函数是一个用户定义的聚合函数,它标识最大时间戳的值并 return 对其进行标识。 rowtime
是新的行时间属性,在 window 的结束时间戳前 1 毫秒。
鉴于此 table,我们称它为 Temp
您可以将下一个查询定义为:
SELECT
Id1, MAX(maxT) as myTimestamp, SUM(valXOfMaxT)
FROM Temp
GROUP BY
Id1, TUMBLE(rowtime, INTERVAL '10' SECONDS)
此查询仅对 Id1
和 TUMBLE
window 进行分组。这是一个 TUMBLE
window 因为第一个 HOP
window 已经将每条记录分组为三个 windows 我们不应该再这样做了。相反,我们将第一个查询的结果分组为 10 秒 windows,因为这是第一个查询中 HOP
windows 的幻灯片长度。