Flink SQL CURRENT_TIMESTAMP 始终return 相同的值
Flink SQL CURRENT_TIMESTAMP always return the same value
我正在使用Flink 1.8中的Flink SQL API。我有两个流表 Table1 和 Table2。
如果我们将 receivedTime
定义为 Table 中接收数据的时间,我想加入 Table1 和 Table2(在某些 id
) 并仅保留 Table1.receivedTime > Table2.receivedTime
.
所在的行
首先,我尝试使用 Flink SQL CURRENT_TIMESTAMP
来做到这一点:
NEW_TABLE1 : SELECT *, CURRENT_TIMESTAMP as receivedTime FROM TABLE1
NEW_TABLE2 : SELECT *, CURRENT_TIMESTAMP as receivedTime FROM TABLE2
RESULT : SELECT * FROM NEW_TABLE1 JOIN NEW_TABLE2
WHERE NEW_TABLE1.id = NEW_TABLE2.id
AND NEW_TABLE1.receivedTime > NEW_TABLE2.receivedTime
但看起来 CURRENT_TIMESTAMP
总是 return 评估查询时的时间戳。 (看起来 CURRENT_TIMESTAMP 此时被替换为当前日期,而不是动态值)。我觉得这种行为很奇怪,这正常吗?
我尝试的第二种解决方案是使用Flink的处理时间:
NEW_TABLE1 : SELECT *, proctime as receivedTime FROM TABLE1
NEW_TABLE2 : SELECT *, proctime as receivedTime FROM TABLE2
RESULT : SELECT * FROM NEW_TABLE1 JOIN NEW_TABLE2
WHERE NEW_TABLE1.id = NEW_TABLE2.id
AND NEW_TABLE1.receivedTime > NEW_TABLE2.receivedTime
但在这种情况下,处理时间似乎是在执行查询时计算的。然后,在我的 JOIN 查询中,两个处理时间总是相等的。
做我想做的事情的正确方法是什么?
Flink 和 Flink SQL 支持两种不同的时间概念:处理时间 是处理事件的时间(或者换句话说,处理时间您的查询正在执行),而 事件时间 基于事件中记录的时间戳。 here in the documentation 描述了 Table 和 SQL API 中如何反映这种区别。
要获得所需内容,您首先需要安排在两个 table 中创建数据的任何进程,以便在每条记录中包含一个事件时间戳。然后你需要配置你的 tables 以便 Flink SQL 知道每个 table 中的哪个字段将用作 rowtime 属性,你还需要指定如何完成 watermarking。
例如,如果您使用的是 SQL 客户端,那么您的架构可能看起来像这样,以指示 rideTime 字段应该用作事件时间戳以及周期性的有界出- 使用延迟 60 秒的顺序水印策略:
schema:
- name: rowTime
type: TIMESTAMP
rowtime:
timestamps:
type: "from-field"
from: "rideTime"
watermarks:
type: "periodic-bounded"
delay: "60000"
如果您不使用 SQL 客户端,请参阅文档以获取示例,无论是否使用 DataStream to Table conversion or TableSources。
更新:
据我所知,您真正喜欢的是使用摄取时间,但 Flink SQL 不支持摄取时间。您必须配置作业以使用 TimeCharacteristic.EventTime
,实施时间戳提取器和水印生成器,并调用 assignTimestampsAndWatermarks
.
如果您不想在每个事件中都使用时间戳字段,您的时间戳提取器可以如下所示:
AssignerWithPeriodicWatermarks<Event> assigner = new AscendingTimestampExtractor<Event> {
@Override
public long extractAscendingTimestamp(Event element) {
return System.currentTimeMillis();
}
};
我正在使用Flink 1.8中的Flink SQL API。我有两个流表 Table1 和 Table2。
如果我们将 receivedTime
定义为 Table 中接收数据的时间,我想加入 Table1 和 Table2(在某些 id
) 并仅保留 Table1.receivedTime > Table2.receivedTime
.
首先,我尝试使用 Flink SQL CURRENT_TIMESTAMP
来做到这一点:
NEW_TABLE1 : SELECT *, CURRENT_TIMESTAMP as receivedTime FROM TABLE1
NEW_TABLE2 : SELECT *, CURRENT_TIMESTAMP as receivedTime FROM TABLE2
RESULT : SELECT * FROM NEW_TABLE1 JOIN NEW_TABLE2
WHERE NEW_TABLE1.id = NEW_TABLE2.id
AND NEW_TABLE1.receivedTime > NEW_TABLE2.receivedTime
但看起来 CURRENT_TIMESTAMP
总是 return 评估查询时的时间戳。 (看起来 CURRENT_TIMESTAMP 此时被替换为当前日期,而不是动态值)。我觉得这种行为很奇怪,这正常吗?
我尝试的第二种解决方案是使用Flink的处理时间:
NEW_TABLE1 : SELECT *, proctime as receivedTime FROM TABLE1
NEW_TABLE2 : SELECT *, proctime as receivedTime FROM TABLE2
RESULT : SELECT * FROM NEW_TABLE1 JOIN NEW_TABLE2
WHERE NEW_TABLE1.id = NEW_TABLE2.id
AND NEW_TABLE1.receivedTime > NEW_TABLE2.receivedTime
但在这种情况下,处理时间似乎是在执行查询时计算的。然后,在我的 JOIN 查询中,两个处理时间总是相等的。
做我想做的事情的正确方法是什么?
Flink 和 Flink SQL 支持两种不同的时间概念:处理时间 是处理事件的时间(或者换句话说,处理时间您的查询正在执行),而 事件时间 基于事件中记录的时间戳。 here in the documentation 描述了 Table 和 SQL API 中如何反映这种区别。
要获得所需内容,您首先需要安排在两个 table 中创建数据的任何进程,以便在每条记录中包含一个事件时间戳。然后你需要配置你的 tables 以便 Flink SQL 知道每个 table 中的哪个字段将用作 rowtime 属性,你还需要指定如何完成 watermarking。
例如,如果您使用的是 SQL 客户端,那么您的架构可能看起来像这样,以指示 rideTime 字段应该用作事件时间戳以及周期性的有界出- 使用延迟 60 秒的顺序水印策略:
schema:
- name: rowTime
type: TIMESTAMP
rowtime:
timestamps:
type: "from-field"
from: "rideTime"
watermarks:
type: "periodic-bounded"
delay: "60000"
如果您不使用 SQL 客户端,请参阅文档以获取示例,无论是否使用 DataStream to Table conversion or TableSources。
更新:
据我所知,您真正喜欢的是使用摄取时间,但 Flink SQL 不支持摄取时间。您必须配置作业以使用 TimeCharacteristic.EventTime
,实施时间戳提取器和水印生成器,并调用 assignTimestampsAndWatermarks
.
如果您不想在每个事件中都使用时间戳字段,您的时间戳提取器可以如下所示:
AssignerWithPeriodicWatermarks<Event> assigner = new AscendingTimestampExtractor<Event> {
@Override
public long extractAscendingTimestamp(Event element) {
return System.currentTimeMillis();
}
};