使用 Flink 在键控 Window 中获取计数
Using Flink to get Counts Within a Keyed Window
我正在通过 Scala 接口使用 Flink 进行一些数据处理。我有一些元组形式的用户数据:
(user1, "titanic")
(user1, "titanic")
(user1, "batman")
(user2, "star wars")
(user2, "star wars")
(user2, "batman")
我想按用户键入,创建一个 window,然后计算用户在该 window 中观看特定电影的次数,这样我最终得到从每部电影映射到每个用户的观看次数。例如,对于 user1
,正确的输出是 Map("titanic" -> 2, "batman" -> 1)
。
我知道我的代码的第一部分应该是这样的:
keyedStream.keyBy(0).window(EventTimeSessionWindows.withGap(Time.minutes(10)))
但我不知道如何在 window 中进行进一步的聚合,以便我最终得到每个 user/window 的视图计数地图。我试图编写自己的 AggregateFunction 将这些计数收集到可变 Map 中,但不幸的是,可变 Map 不可序列化,因此它失败了。
我该怎么做?
您应该可以使用 AggregateFunction
:
来解决问题
source
.keyBy(0)
.timeWindow(Time.seconds(10L))
.aggregate(new AggregateFunction[(String, String), (String, Map[String, Int]), (String, Map[String, Int])] {
override def createAccumulator(): (String, Map[String, Int]) = ("", Map())
override def add(value: (String, String), accumulator: (String, Map[String, Int])): (String, Map[String, Int]) = {
val counter = accumulator._2.getOrElse(value._2, 0)
(value._1, accumulator._2 + (value._2 -> (counter + 1)))
}
override def getResult(accumulator: (String, Map[String, Int])): (String, Map[String, Int]) = accumulator
override def merge(a: (String, Map[String, Int]), b: (String, Map[String, Int])): (String, Map[String, Int]) = {
(a._1, (a._2.keySet ++ b._2.keySet) map (k => k -> (a._2.getOrElse(k, 0) + b._2.getOrElse(k, 0))) toMap)
}
})
我正在通过 Scala 接口使用 Flink 进行一些数据处理。我有一些元组形式的用户数据:
(user1, "titanic")
(user1, "titanic")
(user1, "batman")
(user2, "star wars")
(user2, "star wars")
(user2, "batman")
我想按用户键入,创建一个 window,然后计算用户在该 window 中观看特定电影的次数,这样我最终得到从每部电影映射到每个用户的观看次数。例如,对于 user1
,正确的输出是 Map("titanic" -> 2, "batman" -> 1)
。
我知道我的代码的第一部分应该是这样的:
keyedStream.keyBy(0).window(EventTimeSessionWindows.withGap(Time.minutes(10)))
但我不知道如何在 window 中进行进一步的聚合,以便我最终得到每个 user/window 的视图计数地图。我试图编写自己的 AggregateFunction 将这些计数收集到可变 Map 中,但不幸的是,可变 Map 不可序列化,因此它失败了。
我该怎么做?
您应该可以使用 AggregateFunction
:
source
.keyBy(0)
.timeWindow(Time.seconds(10L))
.aggregate(new AggregateFunction[(String, String), (String, Map[String, Int]), (String, Map[String, Int])] {
override def createAccumulator(): (String, Map[String, Int]) = ("", Map())
override def add(value: (String, String), accumulator: (String, Map[String, Int])): (String, Map[String, Int]) = {
val counter = accumulator._2.getOrElse(value._2, 0)
(value._1, accumulator._2 + (value._2 -> (counter + 1)))
}
override def getResult(accumulator: (String, Map[String, Int])): (String, Map[String, Int]) = accumulator
override def merge(a: (String, Map[String, Int]), b: (String, Map[String, Int])): (String, Map[String, Int]) = {
(a._1, (a._2.keySet ++ b._2.keySet) map (k => k -> (a._2.getOrElse(k, 0) + b._2.getOrElse(k, 0))) toMap)
}
})