flink 数据不是由 timewindow operator 中的 process 函数处理的

flink data is not processed by the process function in a timewindow operator

我有一个时间窗口,我试图确定我是否在一段时间内获得了新密钥。我正在通过 kafka 推送数据,当我调试它时,我看到数据正在进入 keyby 方法,但它没有到达 process 方法,也没有被收集器收集。我正在使用 BoundedOutOfOrdernessTimestampExtractor 分配水印:

    case class Src(qip:Ip, ref: Ip, ts: Long) extends FooRequest

    class TsExtractor extends BoundedOutOfOrdernessTimestampExtractor[Src](Time.hours(3)){
      override def extractTimestamp(element: Src): Long = element.ts
    }

    class RefFilter extends ProcessWindowFunction[Src, IpDetectionSrc, String, TimeWindow]{
      private lazy val stateDescriptor = new ValueStateDescriptor("refFilter",  createTypeInformation[String])

      override def process(key: String, context: Context, elements: Iterable[Src], out: Collector[IpDetectionSrc]): Unit = {
        println(s"RefIpFilter processing $key")//data is not getting here 
        if(Option(context.windowState.getState(stateDescriptor).value()).isEmpty){
          println(s"new key found $key") //data is not getting here also 
          context.windowState.getState(stateDescriptor).update(key)
          out.collect(elements.head)
        }
      }
    }

lazy val env: StreamExecutionEnvironment =
    setupEnv(StreamExecutionEnvironment.getExecutionEnvironment)(300000,Some(stateDir), Some(TimeCharacteristic.EventTime))

 lazy val src: DataStream[FooRequest] = env.addSource(consumer)

 lazy val uniqueRef:DataStream[FooRequest] => DataStream[Src] = src => src 
        .flatMap(new FlatMapFunction[FooRequest,Src ]{
          override def flatMap(value: FooRequest, out: Collector[Src]): Unit =   value match {
            case r: Src =>
              out.collect(r)
            case invalid =>
              log.warn(s"filtered unexpected request $invalid")
          }
        })
        .assignTimestampsAndWatermarks(new TsExtractor)
        .keyBy(r => r.ref)
        .timeWindow(Time.seconds(120))
        .allowedLateness(Time.seconds(360))
        .process(new RefFilter)

uniqueRef(src).addSink(sink)
env.execute()

如有任何帮助,我们将不胜感激

BoundedOutOfOrdernessTimestampExtractor 跟踪它迄今为止看到的最高时间戳,并生成落后于配置延迟(在本例中为三个小时)的水印。这些水印会定期生成,默认情况下每 200 毫秒生成一次。因此,只有一个事件,水印将比该事件晚 3 小时,并且永远不会触发 window。此外,对于有限输入,作业将在处理完所有事件后停止 运行。

context.windowState 是每个 window 状态,生命周期有限。每 2 分钟 window 都会有自己的实例,一旦 window 允许的迟到时间到期,它就会被清除。如果您想要具有全局范围的键控 window 状态,具有不确定的生命周期,请改用 context.globalState