Flink CEP 无法在联合 table 上获得正确的结果
Flink CEP cannot get correct results on a unioned table
我使用 Flink SQL 和 CEP 来识别一些非常简单的模式。但是,我发现了一件奇怪的事情(可能是一个错误)。我有两个示例 tables password_change
和 transfer
,如下所示。
转让
transid,accountnumber,sortcode,value,channel,eventtime,eventtype
1,123,1,100,ONL,2020-01-01T01:00:01Z,transfer
3,123,1,100,ONL,2020-01-01T01:00:02Z,transfer
4,123,1,200,ONL,2020-01-01T01:00:03Z,transfer
5,456,1,200,ONL,2020-01-01T01:00:04Z,transfer
password_change
accountnumber,channel,eventtime,eventtype
123,ONL,2020-01-01T01:00:05Z,password_change
456,ONL,2020-01-01T01:00:06Z,password_change
123,ONL,2020-01-01T01:00:08Z,password_change
123,ONL,2020-01-01T01:00:09Z,password_change
这是我的 SQL 个查询。
首先创建一个临时视图event as
(SELECT accountnumber,rowtime,eventtype FROM password_change WHERE channel='ONL')
UNION ALL
(SELECT accountnumber,rowtime, eventtype FROM transfer WHERE channel = 'ONL' )
rowtime 列是直接从原始 eventtime 列中提取的事件时间,水印周期限制为 1 秒。
然后输出
的查询结果
SELECT * FROM `event`
MATCH_RECOGNIZE (
PARTITION BY accountnumber
ORDER BY rowtime
MEASURES
transfer.eventtype AS event_type,
transfer.rowtime AS transfer_time
ONE ROW PER MATCH
AFTER MATCH SKIP PAST LAST ROW
PATTERN (transfer password_change ) WITHIN INTERVAL '5' SECOND
DEFINE
password_change AS eventtype='password_change',
transfer AS eventtype='transfer'
)
应该输出
123,transfer,2020-01-01T01:00:03Z
456,transfer,2020-01-01T01:00:04Z
但是当 运行 Flink 1.11.1 时我什么也没得到(1.10.1 也没有输出)。
此外,我将模式更改为仅password_change
,它仍然不输出任何内容,但是如果我将模式更改为transfer
,则它会输出几行但不是所有传输行。如果我交换两个 tables 的事件时间,这意味着让 password_changes 先发生,那么模式 password_change
将输出几行,而 transfer
不会。
另一方面,如果我从两个 table 中提取这些列并手动将它们合并为一个 table,然后将它们发送到 Flink,运行 结果是正确的.
我进行了很多搜索和尝试以使其正确,包括更改 SQL 语句、水印、缓冲区超时等,但无济于事。希望这里的任何人都可以提供帮助。谢谢。
10/10/2020 更新:
我使用 Kafka 作为 table 源。 tEnv
是 StreamTableEnvironment。
Kafka kafka=new Kafka()
.version("universal")
.property("bootstrap.servers", "localhost:9092");
tEnv.connect(
kafka.topic("transfer")
).withFormat(
new Json()
.failOnMissingField(true)
).withSchema(
new Schema()
.field("rowtime",DataTypes.TIMESTAMP(3))
.rowtime(new Rowtime()
.timestampsFromField("eventtime")
.watermarksPeriodicBounded(1000)
)
.field("channel",DataTypes.STRING())
.field("eventtype",DataTypes.STRING())
.field("transid",DataTypes.STRING())
.field("accountnumber",DataTypes.STRING())
.field("value",DataTypes.DECIMAL(38,18))
).createTemporaryTable("transfer");
tEnv.connect(
kafka.topic("pchange")
).withFormat(
new Json()
.failOnMissingField(true)
).withSchema(
new Schema()
.field("rowtime",DataTypes.TIMESTAMP(3))
.rowtime(new Rowtime()
.timestampsFromField("eventtime")
.watermarksPeriodicBounded(1000)
)
.field("channel",DataTypes.STRING())
.field("accountnumber",DataTypes.STRING())
.field("eventtype",DataTypes.STRING())
).createTemporaryTable("password_change");
感谢@Dawid Wysakowicz 的回答。为了确认这一点,我在transfer
table的末尾添加了4,123,1,200,ONL,2020-01-01T01:00:10Z,transfer
,然后输出就正确了,这意味着确实是水印问题。
所以现在的问题是如何修复它。由于用户不会经常更改 his/her 密码,因此这两个 table 之间的时间间隔是不可避免的。我只需要 UNION ALL table 具有与我手动合并相同的行为。
2020 年 11 月 4 日更新:
WatermarkStrategy with idle sources 可能会有帮助。
问题很可能出在结合 UNION ALL 运算符生成水印的某个地方。您能否分享一下您是如何创建这两个 table 的,包括您如何定义时间属性以及连接器是什么?它可以让我证实我的怀疑。
我认为问题在于其中一个来源停止发出水印。如果 transfer
table(或具有较低时间戳的 table)未完成且未生成任何记录,则它不会发出水印。在发出第四行后,它将发出 Watermark = 3 (4-1 second)
。输入并集的 Watermark 是两者中最小的值。因此,第一个 table 将 pause/hold 值为 Watermark = 3
的 Watermark,因此您看不到原始查询的任何进展,并且您会看到一些为 table 发出的具有更小时间戳的记录。
如果您手动加入两个 tables,您只有一个输入和一个水印来源,因此它会进一步发展,您会看到一些结果。
我使用 Flink SQL 和 CEP 来识别一些非常简单的模式。但是,我发现了一件奇怪的事情(可能是一个错误)。我有两个示例 tables password_change
和 transfer
,如下所示。
转让
transid,accountnumber,sortcode,value,channel,eventtime,eventtype
1,123,1,100,ONL,2020-01-01T01:00:01Z,transfer
3,123,1,100,ONL,2020-01-01T01:00:02Z,transfer
4,123,1,200,ONL,2020-01-01T01:00:03Z,transfer
5,456,1,200,ONL,2020-01-01T01:00:04Z,transfer
password_change
accountnumber,channel,eventtime,eventtype
123,ONL,2020-01-01T01:00:05Z,password_change
456,ONL,2020-01-01T01:00:06Z,password_change
123,ONL,2020-01-01T01:00:08Z,password_change
123,ONL,2020-01-01T01:00:09Z,password_change
这是我的 SQL 个查询。
首先创建一个临时视图event as
(SELECT accountnumber,rowtime,eventtype FROM password_change WHERE channel='ONL')
UNION ALL
(SELECT accountnumber,rowtime, eventtype FROM transfer WHERE channel = 'ONL' )
rowtime 列是直接从原始 eventtime 列中提取的事件时间,水印周期限制为 1 秒。
然后输出
的查询结果SELECT * FROM `event`
MATCH_RECOGNIZE (
PARTITION BY accountnumber
ORDER BY rowtime
MEASURES
transfer.eventtype AS event_type,
transfer.rowtime AS transfer_time
ONE ROW PER MATCH
AFTER MATCH SKIP PAST LAST ROW
PATTERN (transfer password_change ) WITHIN INTERVAL '5' SECOND
DEFINE
password_change AS eventtype='password_change',
transfer AS eventtype='transfer'
)
应该输出
123,transfer,2020-01-01T01:00:03Z
456,transfer,2020-01-01T01:00:04Z
但是当 运行 Flink 1.11.1 时我什么也没得到(1.10.1 也没有输出)。
此外,我将模式更改为仅password_change
,它仍然不输出任何内容,但是如果我将模式更改为transfer
,则它会输出几行但不是所有传输行。如果我交换两个 tables 的事件时间,这意味着让 password_changes 先发生,那么模式 password_change
将输出几行,而 transfer
不会。
另一方面,如果我从两个 table 中提取这些列并手动将它们合并为一个 table,然后将它们发送到 Flink,运行 结果是正确的.
我进行了很多搜索和尝试以使其正确,包括更改 SQL 语句、水印、缓冲区超时等,但无济于事。希望这里的任何人都可以提供帮助。谢谢。
10/10/2020 更新:
我使用 Kafka 作为 table 源。 tEnv
是 StreamTableEnvironment。
Kafka kafka=new Kafka()
.version("universal")
.property("bootstrap.servers", "localhost:9092");
tEnv.connect(
kafka.topic("transfer")
).withFormat(
new Json()
.failOnMissingField(true)
).withSchema(
new Schema()
.field("rowtime",DataTypes.TIMESTAMP(3))
.rowtime(new Rowtime()
.timestampsFromField("eventtime")
.watermarksPeriodicBounded(1000)
)
.field("channel",DataTypes.STRING())
.field("eventtype",DataTypes.STRING())
.field("transid",DataTypes.STRING())
.field("accountnumber",DataTypes.STRING())
.field("value",DataTypes.DECIMAL(38,18))
).createTemporaryTable("transfer");
tEnv.connect(
kafka.topic("pchange")
).withFormat(
new Json()
.failOnMissingField(true)
).withSchema(
new Schema()
.field("rowtime",DataTypes.TIMESTAMP(3))
.rowtime(new Rowtime()
.timestampsFromField("eventtime")
.watermarksPeriodicBounded(1000)
)
.field("channel",DataTypes.STRING())
.field("accountnumber",DataTypes.STRING())
.field("eventtype",DataTypes.STRING())
).createTemporaryTable("password_change");
感谢@Dawid Wysakowicz 的回答。为了确认这一点,我在transfer
table的末尾添加了4,123,1,200,ONL,2020-01-01T01:00:10Z,transfer
,然后输出就正确了,这意味着确实是水印问题。
所以现在的问题是如何修复它。由于用户不会经常更改 his/her 密码,因此这两个 table 之间的时间间隔是不可避免的。我只需要 UNION ALL table 具有与我手动合并相同的行为。
2020 年 11 月 4 日更新:
WatermarkStrategy with idle sources 可能会有帮助。
问题很可能出在结合 UNION ALL 运算符生成水印的某个地方。您能否分享一下您是如何创建这两个 table 的,包括您如何定义时间属性以及连接器是什么?它可以让我证实我的怀疑。
我认为问题在于其中一个来源停止发出水印。如果 transfer
table(或具有较低时间戳的 table)未完成且未生成任何记录,则它不会发出水印。在发出第四行后,它将发出 Watermark = 3 (4-1 second)
。输入并集的 Watermark 是两者中最小的值。因此,第一个 table 将 pause/hold 值为 Watermark = 3
的 Watermark,因此您看不到原始查询的任何进展,并且您会看到一些为 table 发出的具有更小时间戳的记录。
如果您手动加入两个 tables,您只有一个输入和一个水印来源,因此它会进一步发展,您会看到一些结果。