flink 数据不是由 timewindow operator 中的 process 函数处理的
flink data is not processed by the process function in a timewindow operator
我有一个时间窗口,我试图确定我是否在一段时间内获得了新密钥。我正在通过 kafka 推送数据,当我调试它时,我看到数据正在进入 keyby
方法,但它没有到达 process
方法,也没有被收集器收集。我正在使用 BoundedOutOfOrdernessTimestampExtractor
分配水印:
case class Src(qip:Ip, ref: Ip, ts: Long) extends FooRequest
class TsExtractor extends BoundedOutOfOrdernessTimestampExtractor[Src](Time.hours(3)){
override def extractTimestamp(element: Src): Long = element.ts
}
class RefFilter extends ProcessWindowFunction[Src, IpDetectionSrc, String, TimeWindow]{
private lazy val stateDescriptor = new ValueStateDescriptor("refFilter", createTypeInformation[String])
override def process(key: String, context: Context, elements: Iterable[Src], out: Collector[IpDetectionSrc]): Unit = {
println(s"RefIpFilter processing $key")//data is not getting here
if(Option(context.windowState.getState(stateDescriptor).value()).isEmpty){
println(s"new key found $key") //data is not getting here also
context.windowState.getState(stateDescriptor).update(key)
out.collect(elements.head)
}
}
}
lazy val env: StreamExecutionEnvironment =
setupEnv(StreamExecutionEnvironment.getExecutionEnvironment)(300000,Some(stateDir), Some(TimeCharacteristic.EventTime))
lazy val src: DataStream[FooRequest] = env.addSource(consumer)
lazy val uniqueRef:DataStream[FooRequest] => DataStream[Src] = src => src
.flatMap(new FlatMapFunction[FooRequest,Src ]{
override def flatMap(value: FooRequest, out: Collector[Src]): Unit = value match {
case r: Src =>
out.collect(r)
case invalid =>
log.warn(s"filtered unexpected request $invalid")
}
})
.assignTimestampsAndWatermarks(new TsExtractor)
.keyBy(r => r.ref)
.timeWindow(Time.seconds(120))
.allowedLateness(Time.seconds(360))
.process(new RefFilter)
uniqueRef(src).addSink(sink)
env.execute()
如有任何帮助,我们将不胜感激
BoundedOutOfOrdernessTimestampExtractor
跟踪它迄今为止看到的最高时间戳,并生成落后于配置延迟(在本例中为三个小时)的水印。这些水印会定期生成,默认情况下每 200 毫秒生成一次。因此,只有一个事件,水印将比该事件晚 3 小时,并且永远不会触发 window。此外,对于有限输入,作业将在处理完所有事件后停止 运行。
context.windowState
是每个 window 状态,生命周期有限。每 2 分钟 window 都会有自己的实例,一旦 window 允许的迟到时间到期,它就会被清除。如果您想要具有全局范围的键控 window 状态,具有不确定的生命周期,请改用 context.globalState
。
我有一个时间窗口,我试图确定我是否在一段时间内获得了新密钥。我正在通过 kafka 推送数据,当我调试它时,我看到数据正在进入 keyby
方法,但它没有到达 process
方法,也没有被收集器收集。我正在使用 BoundedOutOfOrdernessTimestampExtractor
分配水印:
case class Src(qip:Ip, ref: Ip, ts: Long) extends FooRequest
class TsExtractor extends BoundedOutOfOrdernessTimestampExtractor[Src](Time.hours(3)){
override def extractTimestamp(element: Src): Long = element.ts
}
class RefFilter extends ProcessWindowFunction[Src, IpDetectionSrc, String, TimeWindow]{
private lazy val stateDescriptor = new ValueStateDescriptor("refFilter", createTypeInformation[String])
override def process(key: String, context: Context, elements: Iterable[Src], out: Collector[IpDetectionSrc]): Unit = {
println(s"RefIpFilter processing $key")//data is not getting here
if(Option(context.windowState.getState(stateDescriptor).value()).isEmpty){
println(s"new key found $key") //data is not getting here also
context.windowState.getState(stateDescriptor).update(key)
out.collect(elements.head)
}
}
}
lazy val env: StreamExecutionEnvironment =
setupEnv(StreamExecutionEnvironment.getExecutionEnvironment)(300000,Some(stateDir), Some(TimeCharacteristic.EventTime))
lazy val src: DataStream[FooRequest] = env.addSource(consumer)
lazy val uniqueRef:DataStream[FooRequest] => DataStream[Src] = src => src
.flatMap(new FlatMapFunction[FooRequest,Src ]{
override def flatMap(value: FooRequest, out: Collector[Src]): Unit = value match {
case r: Src =>
out.collect(r)
case invalid =>
log.warn(s"filtered unexpected request $invalid")
}
})
.assignTimestampsAndWatermarks(new TsExtractor)
.keyBy(r => r.ref)
.timeWindow(Time.seconds(120))
.allowedLateness(Time.seconds(360))
.process(new RefFilter)
uniqueRef(src).addSink(sink)
env.execute()
如有任何帮助,我们将不胜感激
BoundedOutOfOrdernessTimestampExtractor
跟踪它迄今为止看到的最高时间戳,并生成落后于配置延迟(在本例中为三个小时)的水印。这些水印会定期生成,默认情况下每 200 毫秒生成一次。因此,只有一个事件,水印将比该事件晚 3 小时,并且永远不会触发 window。此外,对于有限输入,作业将在处理完所有事件后停止 运行。
context.windowState
是每个 window 状态,生命周期有限。每 2 分钟 window 都会有自己的实例,一旦 window 允许的迟到时间到期,它就会被清除。如果您想要具有全局范围的键控 window 状态,具有不确定的生命周期,请改用 context.globalState
。