将 Traversable[T] 转换为 Stream[T],无需遍历或堆栈溢出
Convert Traversable[T] to Stream[T] without traversing or stack overflow
我正在使用一个提供 Traversable[T] 的库,该库可以对数据库结果进行分页。我想避免将整个东西加载到内存中,所以我试图将它转换为 Stream[T]。
据我所知,内置的 "asStream" 方法将整个 Traversable 加载到缓冲区中,这违背了我的目的。我的尝试(如下)在大结果上遇到了 WhosebugException,我不知道为什么。有人可以帮助我了解发生了什么吗?谢谢!
def asStream[T](traversable: => Traversable[T]): Stream[T] = {
if (traversable.isEmpty) Empty
else {
lazy val head = traversable.head
lazy val tail = asStream(traversable.tail)
head #:: tail
}
}
根据@SCouto
的建议,这是一个重现此内容的完整示例
import scala.collection.immutable.Stream.Empty
object StreamTest {
def main(args: Array[String]) = {
val bigVector = Vector.fill(90000)(1)
val optionStream = asStream(bigVector).map(v => Some(v))
val zipped = optionStream.zipAll(optionStream.tail, None, None)
}
def asStream[T](traversable: => Traversable[T]): Stream[T] = {
@annotation.tailrec
def loop(processed: => Stream[T], pending: => Traversable[T]): Stream[T] = {
if (pending.isEmpty) processed
else {
lazy val head = pending.head
lazy val tail = pending.tail
loop(processed :+ head, tail)
}
}
loop(Empty, traversable)
}
}
编辑:在@SCouto 提出了一些有趣的想法后,我了解到这也可以用蹦床来完成,以将结果保持为原始顺序的 Stream[T]
object StreamTest {
def main(args: Array[String]) = {
val bigVector = Range(1, 90000).toVector
val optionStream = asStream(bigVector).map(v => Some(v))
val zipped = optionStream.zipAll(optionStream.tail, None, None)
zipped.take(10).foreach(println)
}
def asStream[T](traversable: => Traversable[T]): Stream[T] = {
sealed trait Traversal[+R]
case class More[+R](result: R, next: () => Traversal[R]) extends Traversal[R]
case object Done extends Traversal[Nothing]
def next(currentTraversable: Traversable[T]): Traversal[T] = {
if (currentTraversable.isEmpty) Done
else More(currentTraversable.head, () => next(currentTraversable.tail))
}
def trampoline[R](body: => Traversal[R]): Stream[R] = {
def loop(thunk: () => Traversal[R]): Stream[R] = {
thunk.apply match {
case More(result, next) => Stream.cons(result, loop(next))
case Done => Stream.empty
}
}
loop(() => body)
}
trampoline(next(traversable))
}
}
试试这个:
def asStream[T](traversable: => Traversable[T]): Stream[T] = {
@annotation.tailrec
def loop(processed: Stream[T], pending: Traversable[T]): Stream[T] = {
if (pending.isEmpty) processed
else {
lazy val head = pending.head
lazy val tail = pending.tail
loop(head #:: processed, tail)
}
}
loop(Empty, traversable)
}
重点是确保你的递归调用是你递归函数的最后一个动作。
为确保这一点,您可以使用嵌套方法(在示例中称为 loop
)和确保您的方法是 tail-safe 的 tailrec
注释。
您可以找到有关 tail rec 的信息here and in this awesome answer here
编辑
问题是我们在 Stream 的末尾添加元素。如果像示例中那样将其添加为 Stream
的头部,它将正常工作。我更新了我的代码。请测试并告诉我们结果。
我的测试:
scala> val optionStream = asStream(Vector.fill(90000)(1)).map(v => Some(v))
optionStream: scala.collection.immutable.Stream[Some[Int]] = Stream(Some(1), ?)
scala> val zipped = optionStream.zipAll(optionStream.tail, None, None)
zipped: scala.collection.immutable.Stream[(Option[Int], Option[Int])] = Stream((Some(1),Some(1)), ?)
编辑2:
根据您的意见,并考虑您所说的 fpinscala 示例。我想这可能对你有帮助。重点是创建一个带有惰性求值的 case class 结构。其中头部是单个元素,尾部是可遍历的
sealed trait myStream[+T] {
def head: Option[T] = this match {
case MyEmpty => None
case MyCons(h, _) => Some(h())
}
def tail: myStream[T] = this match {
case MyEmpty => MyEmpty
case MyCons(_, t) => myStream.cons(t().head, t().tail)
}
}
case object MyEmpty extends myStream[Nothing]
case class MyCons[+T](h: () => T, t: () => Traversable[T]) extends myStream[T]
object myStream {
def cons[T](hd: => T, tl: => Traversable[T]): myStream[T] = {
lazy val head = hd
lazy val tail = tl
MyCons(() => head, () => tail)
}
def empty[T]: myStream[T] = MyEmpty
def apply[T](as: T*): myStream[T] = {
if (as.isEmpty) empty
else cons(as.head, as.tail)
}
}
一些快速测试:
val bigVector = Vector.fill(90000)(1)
myStream.cons(bigVector.head, bigVector.tail)
res2: myStream[Int] = MyCons(<function0>,<function0>)
取回头部:
res2.head
res3: Option[Int] = Some(1)
还有尾巴:
res2.tail
res4: myStream[Int] = MyCons(<function0>,<function0>)
EDIT3
操作员的蹦床解决方案:
def asStream[T](traversable: => Traversable[T]): Stream[T] = {
sealed trait Traversal[+R]
case class More[+R](result: R, next: () => Traversal[R]) extends Traversal[R]
case object Done extends Traversal[Nothing]
def next(currentTraversable: Traversable[T]): Traversal[T] = {
if (currentTraversable.isEmpty) Done
else More(currentTraversable.head, () => next(currentTraversable.tail))
}
def trampoline[R](body: => Traversal[R]): Stream[R] = {
def loop(thunk: () => Traversal[R]): Stream[R] = {
thunk.apply match {
case More(result, next) => Stream.cons(result, loop(next))
case Done => Stream.empty
}
}
loop(() => body)
}
trampoline(next(traversable))
}
}
Stream
不会将数据保存在内存中,因为您声明了如何生成每个项目。您的数据库数据很可能不是程序生成的,因此您需要在第一次请求时获取数据(类似于 def getData(index: Int): Future[Data]
)。
最大的问题在于,由于您正在从数据库中获取数据,因此您可能正在使用 Future
s,因此,即使您能够实现它,您也会有一个 Future[Stream[Data]]
不太好用的对象,或者更糟糕的是,阻止它。
仅仅对数据库数据查询进行分页不是更有价值吗?
我正在使用一个提供 Traversable[T] 的库,该库可以对数据库结果进行分页。我想避免将整个东西加载到内存中,所以我试图将它转换为 Stream[T]。
据我所知,内置的 "asStream" 方法将整个 Traversable 加载到缓冲区中,这违背了我的目的。我的尝试(如下)在大结果上遇到了 WhosebugException,我不知道为什么。有人可以帮助我了解发生了什么吗?谢谢!
def asStream[T](traversable: => Traversable[T]): Stream[T] = {
if (traversable.isEmpty) Empty
else {
lazy val head = traversable.head
lazy val tail = asStream(traversable.tail)
head #:: tail
}
}
根据@SCouto
的建议,这是一个重现此内容的完整示例import scala.collection.immutable.Stream.Empty
object StreamTest {
def main(args: Array[String]) = {
val bigVector = Vector.fill(90000)(1)
val optionStream = asStream(bigVector).map(v => Some(v))
val zipped = optionStream.zipAll(optionStream.tail, None, None)
}
def asStream[T](traversable: => Traversable[T]): Stream[T] = {
@annotation.tailrec
def loop(processed: => Stream[T], pending: => Traversable[T]): Stream[T] = {
if (pending.isEmpty) processed
else {
lazy val head = pending.head
lazy val tail = pending.tail
loop(processed :+ head, tail)
}
}
loop(Empty, traversable)
}
}
编辑:在@SCouto 提出了一些有趣的想法后,我了解到这也可以用蹦床来完成,以将结果保持为原始顺序的 Stream[T]
object StreamTest {
def main(args: Array[String]) = {
val bigVector = Range(1, 90000).toVector
val optionStream = asStream(bigVector).map(v => Some(v))
val zipped = optionStream.zipAll(optionStream.tail, None, None)
zipped.take(10).foreach(println)
}
def asStream[T](traversable: => Traversable[T]): Stream[T] = {
sealed trait Traversal[+R]
case class More[+R](result: R, next: () => Traversal[R]) extends Traversal[R]
case object Done extends Traversal[Nothing]
def next(currentTraversable: Traversable[T]): Traversal[T] = {
if (currentTraversable.isEmpty) Done
else More(currentTraversable.head, () => next(currentTraversable.tail))
}
def trampoline[R](body: => Traversal[R]): Stream[R] = {
def loop(thunk: () => Traversal[R]): Stream[R] = {
thunk.apply match {
case More(result, next) => Stream.cons(result, loop(next))
case Done => Stream.empty
}
}
loop(() => body)
}
trampoline(next(traversable))
}
}
试试这个:
def asStream[T](traversable: => Traversable[T]): Stream[T] = {
@annotation.tailrec
def loop(processed: Stream[T], pending: Traversable[T]): Stream[T] = {
if (pending.isEmpty) processed
else {
lazy val head = pending.head
lazy val tail = pending.tail
loop(head #:: processed, tail)
}
}
loop(Empty, traversable)
}
重点是确保你的递归调用是你递归函数的最后一个动作。
为确保这一点,您可以使用嵌套方法(在示例中称为 loop
)和确保您的方法是 tail-safe 的 tailrec
注释。
您可以找到有关 tail rec 的信息here and in this awesome answer here
编辑
问题是我们在 Stream 的末尾添加元素。如果像示例中那样将其添加为 Stream
的头部,它将正常工作。我更新了我的代码。请测试并告诉我们结果。
我的测试:
scala> val optionStream = asStream(Vector.fill(90000)(1)).map(v => Some(v))
optionStream: scala.collection.immutable.Stream[Some[Int]] = Stream(Some(1), ?)
scala> val zipped = optionStream.zipAll(optionStream.tail, None, None)
zipped: scala.collection.immutable.Stream[(Option[Int], Option[Int])] = Stream((Some(1),Some(1)), ?)
编辑2:
根据您的意见,并考虑您所说的 fpinscala 示例。我想这可能对你有帮助。重点是创建一个带有惰性求值的 case class 结构。其中头部是单个元素,尾部是可遍历的
sealed trait myStream[+T] {
def head: Option[T] = this match {
case MyEmpty => None
case MyCons(h, _) => Some(h())
}
def tail: myStream[T] = this match {
case MyEmpty => MyEmpty
case MyCons(_, t) => myStream.cons(t().head, t().tail)
}
}
case object MyEmpty extends myStream[Nothing]
case class MyCons[+T](h: () => T, t: () => Traversable[T]) extends myStream[T]
object myStream {
def cons[T](hd: => T, tl: => Traversable[T]): myStream[T] = {
lazy val head = hd
lazy val tail = tl
MyCons(() => head, () => tail)
}
def empty[T]: myStream[T] = MyEmpty
def apply[T](as: T*): myStream[T] = {
if (as.isEmpty) empty
else cons(as.head, as.tail)
}
}
一些快速测试:
val bigVector = Vector.fill(90000)(1)
myStream.cons(bigVector.head, bigVector.tail)
res2: myStream[Int] = MyCons(<function0>,<function0>)
取回头部:
res2.head
res3: Option[Int] = Some(1)
还有尾巴:
res2.tail
res4: myStream[Int] = MyCons(<function0>,<function0>)
EDIT3
操作员的蹦床解决方案:
def asStream[T](traversable: => Traversable[T]): Stream[T] = {
sealed trait Traversal[+R]
case class More[+R](result: R, next: () => Traversal[R]) extends Traversal[R]
case object Done extends Traversal[Nothing]
def next(currentTraversable: Traversable[T]): Traversal[T] = {
if (currentTraversable.isEmpty) Done
else More(currentTraversable.head, () => next(currentTraversable.tail))
}
def trampoline[R](body: => Traversal[R]): Stream[R] = {
def loop(thunk: () => Traversal[R]): Stream[R] = {
thunk.apply match {
case More(result, next) => Stream.cons(result, loop(next))
case Done => Stream.empty
}
}
loop(() => body)
}
trampoline(next(traversable))
}
}
Stream
不会将数据保存在内存中,因为您声明了如何生成每个项目。您的数据库数据很可能不是程序生成的,因此您需要在第一次请求时获取数据(类似于 def getData(index: Int): Future[Data]
)。
最大的问题在于,由于您正在从数据库中获取数据,因此您可能正在使用 Future
s,因此,即使您能够实现它,您也会有一个 Future[Stream[Data]]
不太好用的对象,或者更糟糕的是,阻止它。
仅仅对数据库数据查询进行分页不是更有价值吗?