如何连接两个流式 Flink 表并保留时间戳信息
How to join two streaming Flink tables and preserve timestamp information
我有两个(流式)Tables,一个有事件时间列,一个没有。我想使用 Table API 加入这些,但还没有想出一种方法来做到这一点,同时保留时间戳信息。
考虑以下可以在 Scala REPL 中执行的 MWE:
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.table.api.scala._
import org.apache.flink.types.Row
import org.apache.flink.util.Collector
val streamEnv: StreamExecutionEnvironment = senv
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tableEnv = TableEnvironment.getTableEnvironment(streamEnv)
val table1 = streamEnv.addSource(new SourceFunction[(Long, String)] {
override def run(ctx: SourceFunction.SourceContext[(Long, String)]): Unit = {
ctx.collectWithTimestamp((1L, "hello"), 1L)
}
override def cancel(): Unit = ???
}).toTable(tableEnv, 'ts.rowtime, 'column1)
val table2 = streamEnv.addSource(new SourceFunction[(String, String)] {
override def run(ctx: SourceFunction.SourceContext[(String, String)]): Unit = {
ctx.collect(("hello", "world"))
}
override def cancel(): Unit = ???
}).toTable(tableEnv, 'column2, 'column3)
def checkTable(table: Table): Unit = {
table
.toAppendStream[Row]
.process(new ProcessFunction[Row, Int] {
override def processElement(value: Row, ctx: ProcessFunction[Row, Int]#Context, out: Collector[Int]): Unit = {
out.collect((ctx.timestamp() / 1000).toInt)
}
})
streamEnv.execute()
}
checkTable(table1)
checkTable(table1.join(table2, 'column1 === 'column2).select('column1, 'column2, 'column3))
第一个 table 显然分配了事件时间,因此对 checkTable
的第一次调用成功。 (尽管奇怪的是,这仅在从数据流创建 table 时显式提供 .rowtime
标记时有效)。
在第一个和第二个 table 的连接上调用 checkTable
结果
Caused by: java.lang.NullPointerException
at scala.Predef$.Long2long(Predef.scala:363)
at $anon.processElement(<console>:81)
at $anon.processElement(<console>:79)
at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
即 ctx.timestamp()
在 ProcessFunction.processElement
中为空。我可以通过在联接结果上调用例如 .assignAscendingTimestamps(...)
来强制执行时间戳,但我认为这不安全,因为我不知道联接如何影响排序。是否可以使此连接工作 和 保留时间戳?
通用连接运算符无法保留事件时间戳 属性,因为记录可以按任何顺序连接。
在左侧进入连接运算符的记录可能会与两天前从右侧引入的记录连接。反过来也可能发生同样的情况,即来自左侧的记录在来自右侧的匹配记录到达之前等待一段时间。没有允许发出有意义的水印的界限。因此,所有输入记录的事件时间 属性 都将丢失,它们只能被视为常规 TIMESTAMP
属性。
但是,您可以使用 windowed join,即基本上是一个额外的连接条件来限制记录之间的延迟:
.where("a = d && ltime >= rtime - 5.minutes && ltime < rtime + 10.minutes")
间隔可以自由选择。在这种情况下,Flink 推断出时间戳和水印的边界,并且能够保留事件时间属性。
我有两个(流式)Tables,一个有事件时间列,一个没有。我想使用 Table API 加入这些,但还没有想出一种方法来做到这一点,同时保留时间戳信息。
考虑以下可以在 Scala REPL 中执行的 MWE:
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.table.api.scala._
import org.apache.flink.types.Row
import org.apache.flink.util.Collector
val streamEnv: StreamExecutionEnvironment = senv
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tableEnv = TableEnvironment.getTableEnvironment(streamEnv)
val table1 = streamEnv.addSource(new SourceFunction[(Long, String)] {
override def run(ctx: SourceFunction.SourceContext[(Long, String)]): Unit = {
ctx.collectWithTimestamp((1L, "hello"), 1L)
}
override def cancel(): Unit = ???
}).toTable(tableEnv, 'ts.rowtime, 'column1)
val table2 = streamEnv.addSource(new SourceFunction[(String, String)] {
override def run(ctx: SourceFunction.SourceContext[(String, String)]): Unit = {
ctx.collect(("hello", "world"))
}
override def cancel(): Unit = ???
}).toTable(tableEnv, 'column2, 'column3)
def checkTable(table: Table): Unit = {
table
.toAppendStream[Row]
.process(new ProcessFunction[Row, Int] {
override def processElement(value: Row, ctx: ProcessFunction[Row, Int]#Context, out: Collector[Int]): Unit = {
out.collect((ctx.timestamp() / 1000).toInt)
}
})
streamEnv.execute()
}
checkTable(table1)
checkTable(table1.join(table2, 'column1 === 'column2).select('column1, 'column2, 'column3))
第一个 table 显然分配了事件时间,因此对 checkTable
的第一次调用成功。 (尽管奇怪的是,这仅在从数据流创建 table 时显式提供 .rowtime
标记时有效)。
在第一个和第二个 table 的连接上调用 checkTable
结果
Caused by: java.lang.NullPointerException
at scala.Predef$.Long2long(Predef.scala:363)
at $anon.processElement(<console>:81)
at $anon.processElement(<console>:79)
at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
即 ctx.timestamp()
在 ProcessFunction.processElement
中为空。我可以通过在联接结果上调用例如 .assignAscendingTimestamps(...)
来强制执行时间戳,但我认为这不安全,因为我不知道联接如何影响排序。是否可以使此连接工作 和 保留时间戳?
通用连接运算符无法保留事件时间戳 属性,因为记录可以按任何顺序连接。
在左侧进入连接运算符的记录可能会与两天前从右侧引入的记录连接。反过来也可能发生同样的情况,即来自左侧的记录在来自右侧的匹配记录到达之前等待一段时间。没有允许发出有意义的水印的界限。因此,所有输入记录的事件时间 属性 都将丢失,它们只能被视为常规 TIMESTAMP
属性。
但是,您可以使用 windowed join,即基本上是一个额外的连接条件来限制记录之间的延迟:
.where("a = d && ltime >= rtime - 5.minutes && ltime < rtime + 10.minutes")
间隔可以自由选择。在这种情况下,Flink 推断出时间戳和水印的边界,并且能够保留事件时间属性。