如何用 monix 的 observable 处理递归?
How to handle recursion with monix's observable?
使用 monix 我正在尝试通过构建 Observable[Node] 并使用广度优先算法来遍历图形。
但是我有一点递归问题。这是说明我的问题的片段:
package gp
import monix.eval.Task
import monix.execution.Scheduler.Implicits.global
import monix.reactive._
object HelloObservable {
type Node = Int
//real case fetch next node across the network so the signature
//has to be Node -> List[Task[Node]]
def nexts(i : Node) : List[Task[Node]] =
List(Task(i), Task(i+1))
def go(i :Node) : Task[Iterator[List[Node]]] =
Task.sequence(nexts(i).sliding(100,100).map(Task.gatherUnordered))
def explore(r: Node): Observable[Node] = {
val firsts = for {
ilr <- Observable.fromTask(go(r))
lr <- Observable.fromIterator(ilr)
r <- Observable.fromIterable(lr)
} yield r
firsts ++ firsts.flatMap(explore)
}
def main(args : Array[String]) : Unit = {
val obs = explore(0)
val cancelable = obs
.dump("O")
.subscribe()
scala.io.StdIn.readLine()
}
}
第一次迭代后可观察到的停止。谁能告诉我为什么?
我认为这个问题与递归无关。我认为这是因为您使用 sliding
,其中 returns 和 Iterator
。 Iterator
和 Iterable
之间的主要区别在于,您只能使用一次 Iterator
,之后您只剩下一个空的 Iterator
。这意味着当您执行 firsts.flatMap
时,Observable.fromIterator(ilr)
中没有任何内容,因此不会产生任何内容。
从根本上说,如果不能在内存中保留(大部分)前缀,我认为您无法进行广度优先搜索。但是由于您的 nexts
已经 returns List
,我假设您可以负担得起在内存中拥有该列表的两个副本。而第二个副本是sliding
的物化结果。所以你的固定代码应该是这样的:
object HelloObservable {
import monix.eval.Task
import monix.execution.Scheduler.Implicits.global
import monix.reactive._
type Node = Int
//real case fetch next node across the network so the signature
//has to be Node -> List[Task[Node]]
def nexts(i: Node): List[Task[Node]] = List(Task(i), Task(i + 1))
def go(i: Node): Task[List[List[Node]]] =
Task.sequence(nexts(i).sliding(100, 100).toList.map(Task.gatherUnordered))
def explore(r: Node): Observable[Node] = {
val firsts = for {
ilr <- Observable.fromTask(go(r))
lr <- Observable.fromIterable(ilr)
r <- Observable.fromIterable(lr)
} yield r
firsts ++ firsts.flatMap(explore)
}
def main(args: Array[String]): Unit = {
val obs = explore(0)
val cancelable = obs
.dump("O")
.subscribe()
scala.io.StdIn.readLine()
}
}
使用 monix 我正在尝试通过构建 Observable[Node] 并使用广度优先算法来遍历图形。 但是我有一点递归问题。这是说明我的问题的片段:
package gp
import monix.eval.Task
import monix.execution.Scheduler.Implicits.global
import monix.reactive._
object HelloObservable {
type Node = Int
//real case fetch next node across the network so the signature
//has to be Node -> List[Task[Node]]
def nexts(i : Node) : List[Task[Node]] =
List(Task(i), Task(i+1))
def go(i :Node) : Task[Iterator[List[Node]]] =
Task.sequence(nexts(i).sliding(100,100).map(Task.gatherUnordered))
def explore(r: Node): Observable[Node] = {
val firsts = for {
ilr <- Observable.fromTask(go(r))
lr <- Observable.fromIterator(ilr)
r <- Observable.fromIterable(lr)
} yield r
firsts ++ firsts.flatMap(explore)
}
def main(args : Array[String]) : Unit = {
val obs = explore(0)
val cancelable = obs
.dump("O")
.subscribe()
scala.io.StdIn.readLine()
}
}
第一次迭代后可观察到的停止。谁能告诉我为什么?
我认为这个问题与递归无关。我认为这是因为您使用 sliding
,其中 returns 和 Iterator
。 Iterator
和 Iterable
之间的主要区别在于,您只能使用一次 Iterator
,之后您只剩下一个空的 Iterator
。这意味着当您执行 firsts.flatMap
时,Observable.fromIterator(ilr)
中没有任何内容,因此不会产生任何内容。
从根本上说,如果不能在内存中保留(大部分)前缀,我认为您无法进行广度优先搜索。但是由于您的 nexts
已经 returns List
,我假设您可以负担得起在内存中拥有该列表的两个副本。而第二个副本是sliding
的物化结果。所以你的固定代码应该是这样的:
object HelloObservable {
import monix.eval.Task
import monix.execution.Scheduler.Implicits.global
import monix.reactive._
type Node = Int
//real case fetch next node across the network so the signature
//has to be Node -> List[Task[Node]]
def nexts(i: Node): List[Task[Node]] = List(Task(i), Task(i + 1))
def go(i: Node): Task[List[List[Node]]] =
Task.sequence(nexts(i).sliding(100, 100).toList.map(Task.gatherUnordered))
def explore(r: Node): Observable[Node] = {
val firsts = for {
ilr <- Observable.fromTask(go(r))
lr <- Observable.fromIterable(ilr)
r <- Observable.fromIterable(lr)
} yield r
firsts ++ firsts.flatMap(explore)
}
def main(args: Array[String]): Unit = {
val obs = explore(0)
val cancelable = obs
.dump("O")
.subscribe()
scala.io.StdIn.readLine()
}
}