Flink:为流的每个传入元素评估window
Flink: Evaluate window for each incoming element of stream
我有以下形式的预订元素流:
Booking(id=B1, driverId=D1, time=t1, location=l1)
Booking(id=B2, driverId=D2, time=t2, location=l2)
我需要按位置查找过去 15 分钟内的预订数。但是,对于某个地点的任何新预订,都应该评估 window。
大致像:
Assuming `time` field is set as timestamp of record
bookingStream.keyBy(b=>b.location).window(Any window of 15mins).trigger(triggerFunction)
除了 15 分钟结束时的 trigger function should not be evaluated
而不是 whenever any booking arrives at a location
和 emit the count of booking in last 15min from timestamp of newly arrived booking
.
方法:
使用RichMap函数,将位置预订的优先队列维护为托管状态(ValueState),时间戳作为预订的优先级。对于每个到达的元素,首先将其添加到状态并从当前到达的元素中删除早于 15 分钟的元素。将优先级队列中剩余元素的计数发送给收集器。
这是正确的方法还是可以通过使用其他一些 flink 构造以更好的方式来实现。
如果您 运行 使用基于堆的状态后端,那么您提出的建议应该表现得相当好。但是使用 RocksDB 时,每次访问都必须经过 serialization/deserialization 个优先级队列,这可能会很痛苦。
一种可能在 RocksDB 上表现更好的方法是将当前计数与最早的时间戳一起保存在 ValueState 中,并将预订集保存在 ListState 中。 RocksDB 状态后端可以附加到 ListState 而无需经过 ser/de,因此当最早的元素太旧时,您只需反序列化和重新序列化整个列表。
我有以下形式的预订元素流:
Booking(id=B1, driverId=D1, time=t1, location=l1)
Booking(id=B2, driverId=D2, time=t2, location=l2)
我需要按位置查找过去 15 分钟内的预订数。但是,对于某个地点的任何新预订,都应该评估 window。
大致像:
Assuming `time` field is set as timestamp of record
bookingStream.keyBy(b=>b.location).window(Any window of 15mins).trigger(triggerFunction)
除了 15 分钟结束时的 trigger function should not be evaluated
而不是 whenever any booking arrives at a location
和 emit the count of booking in last 15min from timestamp of newly arrived booking
.
方法:
使用RichMap函数,将位置预订的优先队列维护为托管状态(ValueState),时间戳作为预订的优先级。对于每个到达的元素,首先将其添加到状态并从当前到达的元素中删除早于 15 分钟的元素。将优先级队列中剩余元素的计数发送给收集器。
这是正确的方法还是可以通过使用其他一些 flink 构造以更好的方式来实现。
如果您 运行 使用基于堆的状态后端,那么您提出的建议应该表现得相当好。但是使用 RocksDB 时,每次访问都必须经过 serialization/deserialization 个优先级队列,这可能会很痛苦。
一种可能在 RocksDB 上表现更好的方法是将当前计数与最早的时间戳一起保存在 ValueState 中,并将预订集保存在 ListState 中。 RocksDB 状态后端可以附加到 ListState 而无需经过 ser/de,因此当最早的元素太旧时,您只需反序列化和重新序列化整个列表。