RichParallelSourceFunction 中的水印
Watermarks in a RichParallelSourceFunction
我正在实现一个 SourceFunction,它从数据库中读取数据。
如果数据被处理一次而停止或崩溃(即保存点和检查点),作业应该能够恢复。
我目前拥有的:
@SerialVersionUID(1L)
class JDBCSource(private val waitTimeMs: Long) extends
RichParallelSourceFunction[Event] with StoppableFunction with LazyLogging{
@transient var client: PostGreClient = _
@volatile var isRunning: Boolean = true
val DEFAULT_WAIT_TIME_MS = 1000
def this(clientConfig: Serializable) =
this(clientConfig, DEFAULT_WAIT_TIME_MS)
override def stop(): Unit = {
this.isRunning = false
}
override def open(parameters: Configuration): Unit = {
super.open(parameters)
client = new JDBCClient
}
override def run(ctx: SourceFunction.SourceContext[Event]): Unit = {
while (isRunning){
val statement = client.getConnection.createStatement()
val resultSet = statement.executeQuery("SELECT name, timestamp FROM MYTABLE")
while (resultSet.next()) {
val event: String = resultSet.getString("name")
val timestamp: Long = resultSet.getLong("timestamp")
ctx.collectWithTimestamp(new Event(name, timestamp), timestamp)
}
}
}
override def cancel(): Unit = {
isRunning = false
}
}
如何确保只获取数据库中尚未处理的行?
我假设 ctx
变量会有一些关于当前水印的信息,这样我就可以将我的查询更改为:
select name, timestamp from myTable where timestamp > ctx.getCurrentWaterMark
但是没有适合我的相关方法。任何解决此问题的想法将不胜感激
您必须实施 CheckpointedFunction so that you can manage checkpointing by yourself. The documentation of the interface is pretty comprehensive but if you need an example I advise you to take a look at an example。
本质上,您的函数必须实现 CheckpointedFunction#snapshotState
以使用 Flink 的托管状态存储您需要的状态,然后在执行恢复时,它将在 CheckpointedFunction#initializeState
中读取相同的状态。
我正在实现一个 SourceFunction,它从数据库中读取数据。 如果数据被处理一次而停止或崩溃(即保存点和检查点),作业应该能够恢复。
我目前拥有的:
@SerialVersionUID(1L)
class JDBCSource(private val waitTimeMs: Long) extends
RichParallelSourceFunction[Event] with StoppableFunction with LazyLogging{
@transient var client: PostGreClient = _
@volatile var isRunning: Boolean = true
val DEFAULT_WAIT_TIME_MS = 1000
def this(clientConfig: Serializable) =
this(clientConfig, DEFAULT_WAIT_TIME_MS)
override def stop(): Unit = {
this.isRunning = false
}
override def open(parameters: Configuration): Unit = {
super.open(parameters)
client = new JDBCClient
}
override def run(ctx: SourceFunction.SourceContext[Event]): Unit = {
while (isRunning){
val statement = client.getConnection.createStatement()
val resultSet = statement.executeQuery("SELECT name, timestamp FROM MYTABLE")
while (resultSet.next()) {
val event: String = resultSet.getString("name")
val timestamp: Long = resultSet.getLong("timestamp")
ctx.collectWithTimestamp(new Event(name, timestamp), timestamp)
}
}
}
override def cancel(): Unit = {
isRunning = false
}
}
如何确保只获取数据库中尚未处理的行?
我假设 ctx
变量会有一些关于当前水印的信息,这样我就可以将我的查询更改为:
select name, timestamp from myTable where timestamp > ctx.getCurrentWaterMark
但是没有适合我的相关方法。任何解决此问题的想法将不胜感激
您必须实施 CheckpointedFunction so that you can manage checkpointing by yourself. The documentation of the interface is pretty comprehensive but if you need an example I advise you to take a look at an example。
本质上,您的函数必须实现 CheckpointedFunction#snapshotState
以使用 Flink 的托管状态存储您需要的状态,然后在执行恢复时,它将在 CheckpointedFunction#initializeState
中读取相同的状态。