Flink windows 连接到广播流后不触发

Flink windows do not fire after connecting to broadcast stream

我尝试使用 BroadcastStatePattern 来扩展我的应用程序的功能。 这里有一些代码。 Main

// .... ///
val gatewayBroadcastStateDescriptor = new MapStateDescriptor[String, BCA]("gatewayEvents", classOf[String], classOf[BCASTDATACLASS])


// Broadcast source
val broadcastSource = env
  .addSource(new FlinkKinesisConsumer[String](s"BROADCAST", new SimpleStringSchema, consumerConfig))

val broadcastSourceGatewayEvents = broadcastSource
  .filter(_.contains("someText"))
  .map(json => read[BCASTDATACLASS](json))

val broadcastGatewayEventsConfigurations = broadcastSourceGatewayEvents.broadcast(gatewayBroadcastStateDescriptor)

// packet source
val packetSource = env
  .addSource(
    new FlinkKinesisConsumer[String](s"PACKETS", new SimpleStringSchema, consumerConfig))

val packets = packetSource.disableChaining()
  .map(json => read[MAINDATACLASS](json))
  .assignTimestampsAndWatermarks(WatermarkStrategy
    .forBoundedOutOfOrderness[MAINDATACLASS](Duration.ofSeconds(2))
    .withTimestampAssigner(new PacketWatermarkGenerator))
  .timeWindowAll(Time.seconds(2))
  .process(new OrderPacketWindowFunction)
  .disableChaining()

// connect MainDataSource with BroadcastDataSource
val gwEnrichedPackets = packets
  .keyBy(_.gatewayId)
  .connect(broadcastGatewayEventsConfigurations)
  .process(new EnrichingPackets)

我的 window 函数(在这个例子中什么都不做,只是进一步转发数据)

//....//

class EnrichingPackets()
  extends KeyedBroadcastProcessFunction[String, MAINDATACLASS, BCASTDATACLASS, MAINDATACLASS]
    with LazyLogging {

  private lazy val gatewayEventsStateDescriptor =
    new MapStateDescriptor[String, BCASTDATACLASS]("gatewayEvents", classOf[String], classOf[BCASTDATACLASS])

override def processBroadcastElement( // stream element, context, collector to emit resulting elements
                                    broadcastInput: BCASTDATACLASS,
                                    ctx: KeyedBroadcastProcessFunction[String, MAINDATACLASS, BCASTDATACLASS, MAINDATACLASS]#Context,
                                    out: Collector[MAINDATACLASS]): Unit = {

val gatewayEvents = ctx.getBroadcastState(gatewayEventsStateDescriptor)
println("OK")

}

  override def processElement(
                           packetInput: MAINDATACLASS,
                           readOnlyCtx: KeyedBroadcastProcessFunction[String, MAINDATACLASS, GatewayEvent, MAINDATACLASS]#ReadOnlyContext,
                           out: Collector[MAINDATACLASS]): Unit = {

// get read-only broadcast state
val gatewayEvents = readOnlyCtx.getBroadcastState(gatewayEventsStateDescriptor)
out.collect(packetInput)
  }
}

连接数据和配置流后,我将打开 window 并进行一些处理。 但是当我从 gwEnrichedPackets 打开 window 时什么也没发生,我可以看到(flink ui)只有传入的消息进入 window。即使使用 session windows 并停止数据流 - windows 也不会触发。

allowedLatenesssideOutputLateData对问题的排查没有帮助

有趣的一点是,如果我从 packets 打开 windows - 一切正常。

//  val sessionWindows = gwEnrichedPackets - NOT works
//  val sessionWindows = packets - Works

val sessionWindows = gwEnrichedPackets
  .keyBy(_.tag.tagId)
  .timeWindow(Time.seconds(20))
  //.window(EventTimeSessionWindows.withGap(Time.seconds(120)))
  //.allowedLateness(Time.seconds(12000))
  //.sideOutputLateData(new OutputTag[MAINDATACLASS]("late-readings"))
  .process(new DetectTagGatewayDisconnections)

val lateStream = sessionWindows
  .getSideOutput(new OutputTag[MAINDATACLASS]("late-readings"))

lateStream.print()
sessionWindows.print()

我做错了什么?

在这种情况下,问题在于水印,您仅将水印分配给其中一个流,当给定运算符的输入上有多个流时,Flink 总是选择最低的水印。

因此,在您的情况下,Flink 必须在 packets 生成的水印和广播流生成的水印之间进行选择,其中之一将始终是 Long.MinVal(因为控制流没有水印生成器),因此它将始终选择 Long.MinVal,因此 windows 永远不会进行。

在这种情况下,您只需将 Watermark 分配器添加到 gwEnrichedPackets 流即可解决问题。