Akka Streams:如何计算不同来源的不同值
Akka Streams: how to count distinct values in different sources
我有N
(这在每个具体化上可能不同)有限的排序数字源。我需要这些数字流的结果以及它们出现的次数。
例如:
1,3,5,7 -> | |
1,5,7 -> | ? | -> (1,2),(2,1),(3,1),(4,1),(5,3),(7,2)
2,4,5 -> | |
如何实施?
基本上,您将多个来源合并为一个,然后聚合数据。
这是我用来组合多个来源的辅助对象和方法:
object ConcatSources {
def apply[T](sources: Seq[Source[T, NotUsed]]): Source[T, NotUsed] = {
sources match {
case first :: second :: rest =>
Source.combine(first, second, rest: _*)(Concat(_))
case first :: _ =>
first
case Nil =>
Source.empty
}
}
}
然后解决你的任务:
val sources: Seq[Source[Int, NotUsed]] = Seq(
Source[Int](List(1, 3, 5, 7)),
Source[Int](List(1, 5, 7)),
Source[Int](List(2, 4, 5))
)
ConcatSources(sources).fold(Map[Int, Int]()) { (map, i) =>
map.updated(i, map.getOrElse(i, 0) + 1)
}.runForeach(map => println(map.toList))
另一个答案将所有源连接成一个流...
1,3,5,7,1,5,7,2,4,5
...并通过折叠元素及其各自计数的映射进行累积。必须消耗整个流才能计算计数:换句话说,如果不消耗整个流,就无法知道任何整数出现的次数。
另一种方法是使用 Source#mergeSorted
将 pre-sorted 流合并为一个排序的流...
1,1,2,3,4,5,5,5,7,7
...然后使用 statefulMapConcat
产生计数:
val src1 = Source(List(1, 3, 5, 7))
val src2 = Source(List(1, 5, 7))
val src3 = Source(List(2, 4, 5))
def mergeSortedSources(sources: List[Source[Int, NotUsed]]): Source[Int, NotUsed] =
sources.foldLeft(Source.empty[Int])(_ mergeSorted _)
.concat(Source.single(0)) // this ending element is needed to print the last pair
mergeSortedSources(List(src1, src2, src3))
.statefulMapConcat { () =>
var prev: Option[Int] = None
var count = 0
x =>
prev match {
case None | Some(`x`) =>
count = count + 1
prev = Some(x)
Nil
case Some(oldElem) =>
val oldCount = count
count = 1
prev = Some(x)
(oldElem -> oldCount) :: Nil
}
}.runForeach(println)
运行 上面的代码打印如下:
(1,2)
(2,1)
(3,1)
(4,1)
(5,3)
(7,2)
由于对合并流进行了排序,因此在处理流时会按连续顺序计算计数。即先确定1
出现的次数,再确定2
出现的次数,以此类推
我有N
(这在每个具体化上可能不同)有限的排序数字源。我需要这些数字流的结果以及它们出现的次数。
例如:
1,3,5,7 -> | |
1,5,7 -> | ? | -> (1,2),(2,1),(3,1),(4,1),(5,3),(7,2)
2,4,5 -> | |
如何实施?
基本上,您将多个来源合并为一个,然后聚合数据。 这是我用来组合多个来源的辅助对象和方法:
object ConcatSources {
def apply[T](sources: Seq[Source[T, NotUsed]]): Source[T, NotUsed] = {
sources match {
case first :: second :: rest =>
Source.combine(first, second, rest: _*)(Concat(_))
case first :: _ =>
first
case Nil =>
Source.empty
}
}
}
然后解决你的任务:
val sources: Seq[Source[Int, NotUsed]] = Seq(
Source[Int](List(1, 3, 5, 7)),
Source[Int](List(1, 5, 7)),
Source[Int](List(2, 4, 5))
)
ConcatSources(sources).fold(Map[Int, Int]()) { (map, i) =>
map.updated(i, map.getOrElse(i, 0) + 1)
}.runForeach(map => println(map.toList))
另一个答案将所有源连接成一个流...
1,3,5,7,1,5,7,2,4,5
...并通过折叠元素及其各自计数的映射进行累积。必须消耗整个流才能计算计数:换句话说,如果不消耗整个流,就无法知道任何整数出现的次数。
另一种方法是使用 Source#mergeSorted
将 pre-sorted 流合并为一个排序的流...
1,1,2,3,4,5,5,5,7,7
...然后使用 statefulMapConcat
产生计数:
val src1 = Source(List(1, 3, 5, 7))
val src2 = Source(List(1, 5, 7))
val src3 = Source(List(2, 4, 5))
def mergeSortedSources(sources: List[Source[Int, NotUsed]]): Source[Int, NotUsed] =
sources.foldLeft(Source.empty[Int])(_ mergeSorted _)
.concat(Source.single(0)) // this ending element is needed to print the last pair
mergeSortedSources(List(src1, src2, src3))
.statefulMapConcat { () =>
var prev: Option[Int] = None
var count = 0
x =>
prev match {
case None | Some(`x`) =>
count = count + 1
prev = Some(x)
Nil
case Some(oldElem) =>
val oldCount = count
count = 1
prev = Some(x)
(oldElem -> oldCount) :: Nil
}
}.runForeach(println)
运行 上面的代码打印如下:
(1,2)
(2,1)
(3,1)
(4,1)
(5,3)
(7,2)
由于对合并流进行了排序,因此在处理流时会按连续顺序计算计数。即先确定1
出现的次数,再确定2
出现的次数,以此类推