Flink windows 连接到广播流后不触发
Flink windows do not fire after connecting to broadcast stream
我尝试使用 BroadcastStatePattern 来扩展我的应用程序的功能。
这里有一些代码。 Main
// .... ///
val gatewayBroadcastStateDescriptor = new MapStateDescriptor[String, BCA]("gatewayEvents", classOf[String], classOf[BCASTDATACLASS])
// Broadcast source
val broadcastSource = env
.addSource(new FlinkKinesisConsumer[String](s"BROADCAST", new SimpleStringSchema, consumerConfig))
val broadcastSourceGatewayEvents = broadcastSource
.filter(_.contains("someText"))
.map(json => read[BCASTDATACLASS](json))
val broadcastGatewayEventsConfigurations = broadcastSourceGatewayEvents.broadcast(gatewayBroadcastStateDescriptor)
// packet source
val packetSource = env
.addSource(
new FlinkKinesisConsumer[String](s"PACKETS", new SimpleStringSchema, consumerConfig))
val packets = packetSource.disableChaining()
.map(json => read[MAINDATACLASS](json))
.assignTimestampsAndWatermarks(WatermarkStrategy
.forBoundedOutOfOrderness[MAINDATACLASS](Duration.ofSeconds(2))
.withTimestampAssigner(new PacketWatermarkGenerator))
.timeWindowAll(Time.seconds(2))
.process(new OrderPacketWindowFunction)
.disableChaining()
// connect MainDataSource with BroadcastDataSource
val gwEnrichedPackets = packets
.keyBy(_.gatewayId)
.connect(broadcastGatewayEventsConfigurations)
.process(new EnrichingPackets)
我的 window
函数(在这个例子中什么都不做,只是进一步转发数据)
//....//
class EnrichingPackets()
extends KeyedBroadcastProcessFunction[String, MAINDATACLASS, BCASTDATACLASS, MAINDATACLASS]
with LazyLogging {
private lazy val gatewayEventsStateDescriptor =
new MapStateDescriptor[String, BCASTDATACLASS]("gatewayEvents", classOf[String], classOf[BCASTDATACLASS])
override def processBroadcastElement( // stream element, context, collector to emit resulting elements
broadcastInput: BCASTDATACLASS,
ctx: KeyedBroadcastProcessFunction[String, MAINDATACLASS, BCASTDATACLASS, MAINDATACLASS]#Context,
out: Collector[MAINDATACLASS]): Unit = {
val gatewayEvents = ctx.getBroadcastState(gatewayEventsStateDescriptor)
println("OK")
}
override def processElement(
packetInput: MAINDATACLASS,
readOnlyCtx: KeyedBroadcastProcessFunction[String, MAINDATACLASS, GatewayEvent, MAINDATACLASS]#ReadOnlyContext,
out: Collector[MAINDATACLASS]): Unit = {
// get read-only broadcast state
val gatewayEvents = readOnlyCtx.getBroadcastState(gatewayEventsStateDescriptor)
out.collect(packetInput)
}
}
连接数据和配置流后,我将打开 window 并进行一些处理。
但是当我从 gwEnrichedPackets
打开 window 时什么也没发生,我可以看到(flink ui)只有传入的消息进入 window。即使使用 session windows
并停止数据流 - windows 也不会触发。
allowedLateness
和sideOutputLateData
对问题的排查没有帮助
有趣的一点是,如果我从 packets
打开 windows - 一切正常。
// val sessionWindows = gwEnrichedPackets - NOT works
// val sessionWindows = packets - Works
val sessionWindows = gwEnrichedPackets
.keyBy(_.tag.tagId)
.timeWindow(Time.seconds(20))
//.window(EventTimeSessionWindows.withGap(Time.seconds(120)))
//.allowedLateness(Time.seconds(12000))
//.sideOutputLateData(new OutputTag[MAINDATACLASS]("late-readings"))
.process(new DetectTagGatewayDisconnections)
val lateStream = sessionWindows
.getSideOutput(new OutputTag[MAINDATACLASS]("late-readings"))
lateStream.print()
sessionWindows.print()
我做错了什么?
在这种情况下,问题在于水印,您仅将水印分配给其中一个流,当给定运算符的输入上有多个流时,Flink 总是选择最低的水印。
因此,在您的情况下,Flink 必须在 packets
生成的水印和广播流生成的水印之间进行选择,其中之一将始终是 Long.MinVal
(因为控制流没有水印生成器),因此它将始终选择 Long.MinVal
,因此 windows 永远不会进行。
在这种情况下,您只需将 Watermark 分配器添加到 gwEnrichedPackets
流即可解决问题。
我尝试使用 BroadcastStatePattern 来扩展我的应用程序的功能。
这里有一些代码。 Main
// .... ///
val gatewayBroadcastStateDescriptor = new MapStateDescriptor[String, BCA]("gatewayEvents", classOf[String], classOf[BCASTDATACLASS])
// Broadcast source
val broadcastSource = env
.addSource(new FlinkKinesisConsumer[String](s"BROADCAST", new SimpleStringSchema, consumerConfig))
val broadcastSourceGatewayEvents = broadcastSource
.filter(_.contains("someText"))
.map(json => read[BCASTDATACLASS](json))
val broadcastGatewayEventsConfigurations = broadcastSourceGatewayEvents.broadcast(gatewayBroadcastStateDescriptor)
// packet source
val packetSource = env
.addSource(
new FlinkKinesisConsumer[String](s"PACKETS", new SimpleStringSchema, consumerConfig))
val packets = packetSource.disableChaining()
.map(json => read[MAINDATACLASS](json))
.assignTimestampsAndWatermarks(WatermarkStrategy
.forBoundedOutOfOrderness[MAINDATACLASS](Duration.ofSeconds(2))
.withTimestampAssigner(new PacketWatermarkGenerator))
.timeWindowAll(Time.seconds(2))
.process(new OrderPacketWindowFunction)
.disableChaining()
// connect MainDataSource with BroadcastDataSource
val gwEnrichedPackets = packets
.keyBy(_.gatewayId)
.connect(broadcastGatewayEventsConfigurations)
.process(new EnrichingPackets)
我的 window
函数(在这个例子中什么都不做,只是进一步转发数据)
//....//
class EnrichingPackets()
extends KeyedBroadcastProcessFunction[String, MAINDATACLASS, BCASTDATACLASS, MAINDATACLASS]
with LazyLogging {
private lazy val gatewayEventsStateDescriptor =
new MapStateDescriptor[String, BCASTDATACLASS]("gatewayEvents", classOf[String], classOf[BCASTDATACLASS])
override def processBroadcastElement( // stream element, context, collector to emit resulting elements
broadcastInput: BCASTDATACLASS,
ctx: KeyedBroadcastProcessFunction[String, MAINDATACLASS, BCASTDATACLASS, MAINDATACLASS]#Context,
out: Collector[MAINDATACLASS]): Unit = {
val gatewayEvents = ctx.getBroadcastState(gatewayEventsStateDescriptor)
println("OK")
}
override def processElement(
packetInput: MAINDATACLASS,
readOnlyCtx: KeyedBroadcastProcessFunction[String, MAINDATACLASS, GatewayEvent, MAINDATACLASS]#ReadOnlyContext,
out: Collector[MAINDATACLASS]): Unit = {
// get read-only broadcast state
val gatewayEvents = readOnlyCtx.getBroadcastState(gatewayEventsStateDescriptor)
out.collect(packetInput)
}
}
连接数据和配置流后,我将打开 window 并进行一些处理。
但是当我从 gwEnrichedPackets
打开 window 时什么也没发生,我可以看到(flink ui)只有传入的消息进入 window。即使使用 session windows
并停止数据流 - windows 也不会触发。
allowedLateness
和sideOutputLateData
对问题的排查没有帮助
有趣的一点是,如果我从 packets
打开 windows - 一切正常。
// val sessionWindows = gwEnrichedPackets - NOT works
// val sessionWindows = packets - Works
val sessionWindows = gwEnrichedPackets
.keyBy(_.tag.tagId)
.timeWindow(Time.seconds(20))
//.window(EventTimeSessionWindows.withGap(Time.seconds(120)))
//.allowedLateness(Time.seconds(12000))
//.sideOutputLateData(new OutputTag[MAINDATACLASS]("late-readings"))
.process(new DetectTagGatewayDisconnections)
val lateStream = sessionWindows
.getSideOutput(new OutputTag[MAINDATACLASS]("late-readings"))
lateStream.print()
sessionWindows.print()
我做错了什么?
在这种情况下,问题在于水印,您仅将水印分配给其中一个流,当给定运算符的输入上有多个流时,Flink 总是选择最低的水印。
因此,在您的情况下,Flink 必须在 packets
生成的水印和广播流生成的水印之间进行选择,其中之一将始终是 Long.MinVal
(因为控制流没有水印生成器),因此它将始终选择 Long.MinVal
,因此 windows 永远不会进行。
在这种情况下,您只需将 Watermark 分配器添加到 gwEnrichedPackets
流即可解决问题。