Scala - 懒惰地对有序迭代器进行分组
Scala - grouping on an ordered iterator lazily
我有一个 Iterator[Record]
是在 record.id
上这样订购的:
record.id=1
record.id=1
...
record.id=1
record.id=2
record.id=2
..
record.id=2
特定 ID 的记录可能会出现很多次,所以我想编写一个函数,将此迭代器作为输入,并 returns 以惰性方式输出 Iterator[Iterator[Record]]
。
我能够想出以下内容,但在 WhosebugError
大约 500K 条记录后它失败了:
def groupByIter[T, B](iterO: Iterator[T])(func: T => B): Iterator[Iterator[T]] = new Iterator[Iterator[T]] {
var iter = iterO
def hasNext = iter.hasNext
def next() = {
val first = iter.next()
val firstValue = func(first)
val (i1, i2) = iter.span(el => func(el) == firstValue)
iter = i2
Iterator(first) ++ i1
}
}
我做错了什么?
这里的问题是每个 Iterator.span
调用都会为 trailing
迭代器创建另一个堆栈闭包,如果没有任何蹦床,它很容易溢出。
实际上我不认为有一个实现,它不是记忆前缀迭代器的元素,因为跟随的迭代器可以在前缀被排出之前访问。
即使在 .span
implementation 中也有一个 Queue
来记忆 Leading
定义中的元素。
我能想象到的最简单的实现是下面的 Stream
。
implicit class StreamChopOps[T](xs: Stream[T]) {
def chopBy[U](f: T => U): Stream[Stream[T]] = xs match {
case x #:: _ =>
def eq(e: T) = f(e) == f(x)
xs.takeWhile(eq) #:: xs.dropWhile(eq).chopBy(f)
case _ => Stream.empty
}
}
尽管它可能不是性能最好的,因为它会记忆很多。但是通过适当的迭代,GC 应该处理过多的中间流的问题。
您可以将其用作 myIterator.toStream.chopBy(f)
简单检查验证以下代码可以 运行 没有 SO
Iterator.fill(10000000)(Iterator(1,1,2)).flatten //1,1,2,1,1,2,...
.toStream.chopBy(identity) //(1,1),(2),(1,1),(2),...
.map(xs => xs.sum * xs.size).sum //60000000
受 @Odomontois 实现的 chopBy 的启发,这里是我为 Iterator 实现的 chopBy。当然,每个批量都应该适合分配的内存。它看起来不是很优雅,但它似乎工作:)
implicit class IteratorChopOps[A](toChopIter: Iterator[A]) {
def chopBy[U](f: A => U) = new Iterator[Traversable[A]] {
var next_el: Option[A] = None
@tailrec
private def accum(acc: List[A]): List[A] = {
next_el = None
val new_acc = hasNext match {
case true =>
val next = toChopIter.next()
acc match {
case Nil =>
acc :+ next
case _ MatchTail t if (f(t) == f(next)) =>
acc :+ next
case _ =>
next_el = Some(next)
acc
}
case false =>
next_el = None
return acc
}
next_el match{
case Some(_) =>
new_acc
case None => accum(new_acc)
}
}
def hasNext = {
toChopIter.hasNext || next_el.isDefined
}
def next: Traversable[A] = accum(next_el.toList)
}
}
这里是匹配尾巴的提取器:
object MatchTail {
def unapply[A] (l: Traversable[A]) = Some( (l.init, l.last) )
}
我有一个 Iterator[Record]
是在 record.id
上这样订购的:
record.id=1
record.id=1
...
record.id=1
record.id=2
record.id=2
..
record.id=2
特定 ID 的记录可能会出现很多次,所以我想编写一个函数,将此迭代器作为输入,并 returns 以惰性方式输出 Iterator[Iterator[Record]]
。
我能够想出以下内容,但在 WhosebugError
大约 500K 条记录后它失败了:
def groupByIter[T, B](iterO: Iterator[T])(func: T => B): Iterator[Iterator[T]] = new Iterator[Iterator[T]] {
var iter = iterO
def hasNext = iter.hasNext
def next() = {
val first = iter.next()
val firstValue = func(first)
val (i1, i2) = iter.span(el => func(el) == firstValue)
iter = i2
Iterator(first) ++ i1
}
}
我做错了什么?
这里的问题是每个 Iterator.span
调用都会为 trailing
迭代器创建另一个堆栈闭包,如果没有任何蹦床,它很容易溢出。
实际上我不认为有一个实现,它不是记忆前缀迭代器的元素,因为跟随的迭代器可以在前缀被排出之前访问。
即使在 .span
implementation 中也有一个 Queue
来记忆 Leading
定义中的元素。
我能想象到的最简单的实现是下面的 Stream
。
implicit class StreamChopOps[T](xs: Stream[T]) {
def chopBy[U](f: T => U): Stream[Stream[T]] = xs match {
case x #:: _ =>
def eq(e: T) = f(e) == f(x)
xs.takeWhile(eq) #:: xs.dropWhile(eq).chopBy(f)
case _ => Stream.empty
}
}
尽管它可能不是性能最好的,因为它会记忆很多。但是通过适当的迭代,GC 应该处理过多的中间流的问题。
您可以将其用作 myIterator.toStream.chopBy(f)
简单检查验证以下代码可以 运行 没有 SO
Iterator.fill(10000000)(Iterator(1,1,2)).flatten //1,1,2,1,1,2,...
.toStream.chopBy(identity) //(1,1),(2),(1,1),(2),...
.map(xs => xs.sum * xs.size).sum //60000000
受 @Odomontois 实现的 chopBy 的启发,这里是我为 Iterator 实现的 chopBy。当然,每个批量都应该适合分配的内存。它看起来不是很优雅,但它似乎工作:)
implicit class IteratorChopOps[A](toChopIter: Iterator[A]) {
def chopBy[U](f: A => U) = new Iterator[Traversable[A]] {
var next_el: Option[A] = None
@tailrec
private def accum(acc: List[A]): List[A] = {
next_el = None
val new_acc = hasNext match {
case true =>
val next = toChopIter.next()
acc match {
case Nil =>
acc :+ next
case _ MatchTail t if (f(t) == f(next)) =>
acc :+ next
case _ =>
next_el = Some(next)
acc
}
case false =>
next_el = None
return acc
}
next_el match{
case Some(_) =>
new_acc
case None => accum(new_acc)
}
}
def hasNext = {
toChopIter.hasNext || next_el.isDefined
}
def next: Traversable[A] = accum(next_el.toList)
}
}
这里是匹配尾巴的提取器:
object MatchTail {
def unapply[A] (l: Traversable[A]) = Some( (l.init, l.last) )
}