Flink SQL:使用 Group By 的外部连接给出了意外的输出
Flink SQL : Outer Join with Group By gives unexpected output
我有两个 Flink 动态表 Event
和 Configuration
.
Event
具有结构:[id, myTimestamp]
和 Configuration
具有结构:id, myValue, myTimestamp
我正在尝试执行 return Event.id, Configuration.myValue
或 Event.id, null
的 Flink SQL 查询 如果 Event
行 id
执行不匹配 Configuration
中的任何 id
。
预期行为示例(Event
和 Configuration
开始为空):
示例必须解读为:
[DATA_RECEIVED] => TARGET_TABLE : EXPECTED_OUTPUT
由于 SQL 查询是从一个连接进行的,它被插入到一个 UpsertSink
中(输出的第一个值对应于 upsert 布尔值)
[myId-1, 10] => EventTable : [(true, myId-1, null)]
[myId-1, myValue-A, 15] => ConfigurationTable : [(false, myId-1, null), (true, myId-1, myValue-A)]
[myId-1, myValue-A, 20] => ConfigurationTable : [(false, myId-1, myValue-A), (true, myId-1, myValue-A)]
[myId-1, myValue-B, 25] => ConfigurationTable : [(false, myId-1, myValue-A), (true, myId-1, myValue-B)]
[myId-1, 30] => EventTable : [(false, myId-1, null), (true, myId-1, myValue-B)]
所以我做了这个查询:
SELECT
Event.id,
Configuration.myValue
FROM
(SELECT id, MAX(myTimestamp) as myTimestamp FROM Event GROUP BY id) as Event
LEFT JOIN
(SELECT id, LATEST_VAL(myValue, myTimestamp) as myValue, MAX(myTimestamp) as myTimestamp FROM Configuration GROUP BY id, myValue) as Configuration
ON Event.id = Configuration.id
GROUP BY Event.id, Configuration.myValue
其中 LATEST_VAL
是 return myValue
关联到 MAX(myTimestamp)
的 UDF。
但是我有一些我不理解的行为。以下是观察到的结果:
[myId-1, 10] => EventTable : [(true, myId-1, null)] // OK
[myId-1, myValue-A, 15] => ConfigurationTable : [(false, myId-1, null), (true, myId-1, myValue-A)] // OK
[myId-1, myValue-A, 20] => ConfigurationTable : [(false, myId-1, myValue-A), (true, myId-1, null), (false, myId-1, null), (true, myId-1, myValue-A)] // NOT OK
[myId-1, myValue-B, 25] => ConfigurationTable : [(false, myId-1, myValue-A), (true, myId-1, null), (false, myId-1, null), (true, myId-1, myValue-B)] // NOT OK
[myId-1, 30] => EventTable : [(false, myId-1, null), (true, myId-1, myValue-B)] // OK
您如何解释预期行为和观察到的行为之间的差异?为什么会有额外的输出 (true, myId-1, null), (false, myId-1, null)
?
是否可以调整 SQL 查询以获得所需的行为?
注:
- 我正在使用 Flink 1.8
我认为您错过的一点是您实际上加入了两个 retract 流。即使您的输入流是仅附加流,您也在子查询中对它们执行聚合,这会产生收缩。
让我们首先分析子查询的结果:
子查询 1:
Query: SELECT id, MAX(myTimestamp) as myTimestamp FROM Event GROUP BY id
Resulting stream:
(true, myId-1, 10L)
(false, myId-1, 10L)
(true, myId-1, 30L)
子查询 2:
Query: SELECT id, LATEST_VAL(myValue, myTimestamp) as myValue, MAX(myTimestamp) as myTimestamp FROM Configuration GROUP BY id, myValue
Resulting stream:
(true, "myId-1", "myValue-A", 15L)
(false, "myId-1", "myValue-A", 15L)
(true, "myId-1", "myValue-A", 20L)
(false, "myId-1", "myValue-A", 20L)
(true, "myId-1", "myValue-B", 25L)
之后,您将在这两个撤回流之上执行连接和分组。考虑到这一点,您的示例中实际加入和分组的是:
[true, myId-1, 10] : [(true, myId-1, null)]
[true, myId-1, myValue-A, 15] : [(false, myId-1, null), (true, myId-1, myValue-A)]
[false, myId-1, myValue-A, 15] : [(false, myId-1, myValue-A), (true, myId-1, null)]
[true, myId-1, myValue-A, 20] : [(false, myId-1, null), (true, myId-1, myValue-A)]
[false, myId-1, myValue-A, 20] : [(false, myId-1, myValue-A), (true, myId-1, null)]
[true, myId-1, myValue-B, 25] : [(false, myId-1, null), (true, myId-1, myValue-B)]
...
总的来说,据我所知,它产生了正确的结果。对于每个输入行,最后发出的行表示对应于给定 id 的最新值。
我有两个 Flink 动态表 Event
和 Configuration
.
Event
具有结构:[id, myTimestamp]
和 Configuration
具有结构:id, myValue, myTimestamp
我正在尝试执行 return Event.id, Configuration.myValue
或 Event.id, null
的 Flink SQL 查询 如果 Event
行 id
执行不匹配 Configuration
中的任何 id
。
预期行为示例(Event
和 Configuration
开始为空):
示例必须解读为:
[DATA_RECEIVED] => TARGET_TABLE : EXPECTED_OUTPUT
由于 SQL 查询是从一个连接进行的,它被插入到一个 UpsertSink
中(输出的第一个值对应于 upsert 布尔值)
[myId-1, 10] => EventTable : [(true, myId-1, null)]
[myId-1, myValue-A, 15] => ConfigurationTable : [(false, myId-1, null), (true, myId-1, myValue-A)]
[myId-1, myValue-A, 20] => ConfigurationTable : [(false, myId-1, myValue-A), (true, myId-1, myValue-A)]
[myId-1, myValue-B, 25] => ConfigurationTable : [(false, myId-1, myValue-A), (true, myId-1, myValue-B)]
[myId-1, 30] => EventTable : [(false, myId-1, null), (true, myId-1, myValue-B)]
所以我做了这个查询:
SELECT
Event.id,
Configuration.myValue
FROM
(SELECT id, MAX(myTimestamp) as myTimestamp FROM Event GROUP BY id) as Event
LEFT JOIN
(SELECT id, LATEST_VAL(myValue, myTimestamp) as myValue, MAX(myTimestamp) as myTimestamp FROM Configuration GROUP BY id, myValue) as Configuration
ON Event.id = Configuration.id
GROUP BY Event.id, Configuration.myValue
其中 LATEST_VAL
是 return myValue
关联到 MAX(myTimestamp)
的 UDF。
但是我有一些我不理解的行为。以下是观察到的结果:
[myId-1, 10] => EventTable : [(true, myId-1, null)] // OK
[myId-1, myValue-A, 15] => ConfigurationTable : [(false, myId-1, null), (true, myId-1, myValue-A)] // OK
[myId-1, myValue-A, 20] => ConfigurationTable : [(false, myId-1, myValue-A), (true, myId-1, null), (false, myId-1, null), (true, myId-1, myValue-A)] // NOT OK
[myId-1, myValue-B, 25] => ConfigurationTable : [(false, myId-1, myValue-A), (true, myId-1, null), (false, myId-1, null), (true, myId-1, myValue-B)] // NOT OK
[myId-1, 30] => EventTable : [(false, myId-1, null), (true, myId-1, myValue-B)] // OK
您如何解释预期行为和观察到的行为之间的差异?为什么会有额外的输出 (true, myId-1, null), (false, myId-1, null)
?
是否可以调整 SQL 查询以获得所需的行为?
注:
- 我正在使用 Flink 1.8
我认为您错过的一点是您实际上加入了两个 retract 流。即使您的输入流是仅附加流,您也在子查询中对它们执行聚合,这会产生收缩。
让我们首先分析子查询的结果:
子查询 1:
Query: SELECT id, MAX(myTimestamp) as myTimestamp FROM Event GROUP BY id
Resulting stream:
(true, myId-1, 10L)
(false, myId-1, 10L)
(true, myId-1, 30L)
子查询 2:
Query: SELECT id, LATEST_VAL(myValue, myTimestamp) as myValue, MAX(myTimestamp) as myTimestamp FROM Configuration GROUP BY id, myValue
Resulting stream:
(true, "myId-1", "myValue-A", 15L)
(false, "myId-1", "myValue-A", 15L)
(true, "myId-1", "myValue-A", 20L)
(false, "myId-1", "myValue-A", 20L)
(true, "myId-1", "myValue-B", 25L)
之后,您将在这两个撤回流之上执行连接和分组。考虑到这一点,您的示例中实际加入和分组的是:
[true, myId-1, 10] : [(true, myId-1, null)]
[true, myId-1, myValue-A, 15] : [(false, myId-1, null), (true, myId-1, myValue-A)]
[false, myId-1, myValue-A, 15] : [(false, myId-1, myValue-A), (true, myId-1, null)]
[true, myId-1, myValue-A, 20] : [(false, myId-1, null), (true, myId-1, myValue-A)]
[false, myId-1, myValue-A, 20] : [(false, myId-1, myValue-A), (true, myId-1, null)]
[true, myId-1, myValue-B, 25] : [(false, myId-1, null), (true, myId-1, myValue-B)]
...
总的来说,据我所知,它产生了正确的结果。对于每个输入行,最后发出的行表示对应于给定 id 的最新值。