Scala - 懒惰地对有序迭代器进行分组

Scala - grouping on an ordered iterator lazily

我有一个 Iterator[Record] 是在 record.id 上这样订购的:

record.id=1
record.id=1
...
record.id=1
record.id=2
record.id=2
..
record.id=2

特定 ID 的记录可能会出现很多次,所以我想编写一个函数,将此迭代器作为输入,并 returns 以惰性方式输出 Iterator[Iterator[Record]]

我能够想出以下内容,但在 WhosebugError 大约 500K 条记录后它失败了:

def groupByIter[T, B](iterO: Iterator[T])(func: T => B): Iterator[Iterator[T]] = new Iterator[Iterator[T]] {
    var iter = iterO
    def hasNext = iter.hasNext

    def next() = {
      val first = iter.next()
      val firstValue = func(first)
      val (i1, i2) = iter.span(el => func(el) == firstValue)
      iter = i2
      Iterator(first) ++ i1
    }
  }

我做错了什么?

这里的问题是每个 Iterator.span 调用都会为 trailing 迭代器创建另一个堆栈闭包,如果没有任何蹦床,它很容易溢出。

实际上我不认为有一个实现,它不是记忆前缀迭代器的元素,因为跟随的迭代器可以在前缀被排出之前访问。

即使在 .span implementation 中也有一个 Queue 来记忆 Leading 定义中的元素。

我能想象到的最简单的实现是下面的 Stream

implicit class StreamChopOps[T](xs: Stream[T]) {
  def chopBy[U](f: T => U): Stream[Stream[T]] = xs match {
    case x #:: _ =>
      def eq(e: T) = f(e) == f(x)
      xs.takeWhile(eq) #:: xs.dropWhile(eq).chopBy(f)
    case _ => Stream.empty
  }
}

尽管它可能不是性能最好的,因为它会记忆很多。但是通过适当的迭代,GC 应该处理过多的中间流的问题。

您可以将其用作 myIterator.toStream.chopBy(f)

简单检查验证以下代码可以 运行 没有 SO

Iterator.fill(10000000)(Iterator(1,1,2)).flatten //1,1,2,1,1,2,...
  .toStream.chopBy(identity)                     //(1,1),(2),(1,1),(2),...
  .map(xs => xs.sum * xs.size).sum               //60000000

受 @Odomontois 实现的 chopBy 的启发,这里是我为 Iterator 实现的 chopBy。当然,每个批量都应该适合分配的内存。它看起来不是很优雅,但它似乎工作:)

implicit class IteratorChopOps[A](toChopIter: Iterator[A]) {

 def chopBy[U](f: A => U) = new Iterator[Traversable[A]] {
  var next_el: Option[A] = None
  @tailrec
  private def accum(acc: List[A]): List[A] = {
    next_el = None
    val new_acc = hasNext match {
      case true =>
        val next = toChopIter.next()
        acc match {
          case Nil =>
            acc :+ next
          case _ MatchTail t if (f(t) == f(next)) =>
            acc :+ next
          case _ =>
            next_el = Some(next)
            acc
        }
      case false =>
        next_el = None
        return acc
    }

    next_el match{
      case Some(_) =>
        new_acc
      case None => accum(new_acc)
    }
  }

  def hasNext = {
    toChopIter.hasNext || next_el.isDefined
  }
  def next: Traversable[A] = accum(next_el.toList)
}
}

这里是匹配尾巴的提取器:

object MatchTail {
  def unapply[A] (l: Traversable[A]) = Some( (l.init, l.last) )
}