Flink broadcast state implement session window inside process 函数
Flink broadcast state implement session window inside process function
我的 flink 应用程序旨在处理来自传感器的物联网数据。
传感器通过网关发送数据。这就是示例数据的样子
case class Data(sensorId: String, value: Float, gatewayId: String, timestamp: Long)
来自同一传感器的数据可以来自不同的网关
如果网关与网络断开连接,那么我会收到一个关于此的特殊事件case class GatewayEvents(gatewayId: String, event: String, timestamp: Long)
并使用连接到来自传感器的主数据流的广播流
传感器有两种情况可能不发送数据,
- 坏了
- 网关与网络断开连接(将在广播流中接收
GatewayEvents("gwId","disconnected",1617979694)
消息)
如果我收到消息说某个网关与网络断开连接并且通过它发送数据的传感器停止发送数据(例如,在 1 分钟内),我需要创建一个特殊事件
我的半实现实现如下所示:
case class Data(sensorId: String, value: Float, gatewayId: String)
case class GatewayEvents(gatewayId: String, event: String, timestamp: Long)
val sensorData: DataStream[Data] ...
val gwData: DataStream[GatewayEvents] ...
val gatewayBroadcastStateDescriptor = new MapStateDescriptor[String, GatewayEvents]("gatewayEvents", classOf[String], classOf[GatewayEvents])
val broadcastGatewayEventsStream = gwData.broadcast(gatewayBroadcastStateDescriptor)
val events: sensorData.
.keyBy(_.sensorId)
.connect(broadcastGatewayEventsStream)
.process(...)
无法执行此流程。有任何想法吗?我认为 SessionWindows 会帮助我,但我不知道如何最好地做到这一点
因此,我认为在这种情况下最简单的想法是使用计时器。所以,基本上你可以实现 KeyedCoProcess
功能,如果它收到 GatewayDisconnected
消息,你将注册定时器(处理时间)在所需时间后触发。如果有任何消息到达传感器,您只需删除已注册的计时器,这样它就不会触发。 onTimer
函数内部您可以简单地发出所需的事件,因为如果计时器触发,则意味着时间跨度内没有值到达。
这里要注意的一件事是,如果您 keyBy(_.sensorId)
这意味着将为通过此网关接收到的每个传感器生成事件。如果您只想为网关发出一个事件,您只需将分区更改为 keyBy(_.gatewayId)
.
我的 flink 应用程序旨在处理来自传感器的物联网数据。
传感器通过网关发送数据。这就是示例数据的样子
case class Data(sensorId: String, value: Float, gatewayId: String, timestamp: Long)
来自同一传感器的数据可以来自不同的网关
如果网关与网络断开连接,那么我会收到一个关于此的特殊事件case class GatewayEvents(gatewayId: String, event: String, timestamp: Long)
并使用连接到来自传感器的主数据流的广播流
传感器有两种情况可能不发送数据,
- 坏了
- 网关与网络断开连接(将在广播流中接收
GatewayEvents("gwId","disconnected",1617979694)
消息)
如果我收到消息说某个网关与网络断开连接并且通过它发送数据的传感器停止发送数据(例如,在 1 分钟内),我需要创建一个特殊事件
我的半实现实现如下所示:
case class Data(sensorId: String, value: Float, gatewayId: String)
case class GatewayEvents(gatewayId: String, event: String, timestamp: Long)
val sensorData: DataStream[Data] ...
val gwData: DataStream[GatewayEvents] ...
val gatewayBroadcastStateDescriptor = new MapStateDescriptor[String, GatewayEvents]("gatewayEvents", classOf[String], classOf[GatewayEvents])
val broadcastGatewayEventsStream = gwData.broadcast(gatewayBroadcastStateDescriptor)
val events: sensorData.
.keyBy(_.sensorId)
.connect(broadcastGatewayEventsStream)
.process(...)
无法执行此流程。有任何想法吗?我认为 SessionWindows 会帮助我,但我不知道如何最好地做到这一点
因此,我认为在这种情况下最简单的想法是使用计时器。所以,基本上你可以实现 KeyedCoProcess
功能,如果它收到 GatewayDisconnected
消息,你将注册定时器(处理时间)在所需时间后触发。如果有任何消息到达传感器,您只需删除已注册的计时器,这样它就不会触发。 onTimer
函数内部您可以简单地发出所需的事件,因为如果计时器触发,则意味着时间跨度内没有值到达。
这里要注意的一件事是,如果您 keyBy(_.sensorId)
这意味着将为通过此网关接收到的每个传感器生成事件。如果您只想为网关发出一个事件,您只需将分区更改为 keyBy(_.gatewayId)
.