如何从递归生成值的流创建 akka-stream 源?
How to create an akka-stream Source from a Flow that generate values recursively?
我需要遍历一个形状像树的API。例如,目录结构或讨论主题。它可以通过以下流程建模:
type ItemId = Int
type Data = String
case class Item(data: Data, kids: List[ItemId])
def randomData(): Data = scala.util.Random.alphanumeric.take(2).mkString
// 0 => [1, 9]
// 1 => [10, 19]
// 2 => [20, 29]
// ...
// 9 => [90, 99]
// _ => []
// NB. I don't have access to this function, only the itemFlow.
def nested(id: ItemId): List[ItemId] =
if (id == 0) (1 to 9).toList
else if (1 <= id && id <= 9) ((id * 10) to ((id + 1) * 10 - 1)).toList
else Nil
val itemFlow: Flow[ItemId, Item, NotUsed] =
Flow.fromFunction(id => Item(randomData, nested(id)))
如何遍历这些数据?我得到了以下工作:
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._
import scala.concurrent.Await
import scala.concurrent.duration.Duration
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
val loop =
GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val source = b.add(Flow[Int])
val merge = b.add(Merge[Int](2))
val fetch = b.add(itemFlow)
val bcast = b.add(Broadcast[Item](2))
val kids = b.add(Flow[Item].mapConcat(_.kids))
val data = b.add(Flow[Item].map(_.data))
val buffer = Flow[Int].buffer(100, OverflowStrategy.dropHead)
source ~> merge ~> fetch ~> bcast ~> data
merge <~ buffer <~ kids <~ bcast
FlowShape(source.in, data.out)
}
val flow = Flow.fromGraph(loop)
Await.result(
Source.single(0).via(flow).runWith(Sink.foreach(println)),
Duration.Inf
)
system.terminate()
但是,由于我使用的是带缓冲区的流,因此流永远不会完成。
Completes when upstream completes and buffered elements have been drained
我多次阅读 Graph cycles, liveness, and deadlocks 部分,但我仍在努力寻找答案。
这将创建一个活锁:
import java.util.concurrent.atomic.AtomicInteger
def unfold[S, E](seed: S, flow: Flow[S, E, NotUsed])(loop: E => List[S]): Source[E, NotUsed] = {
// keep track of how many element flows,
val remaining = new AtomicInteger(1) // 1 = seed
// should be > max loop(x)
val bufferSize = 10000
val (ref, publisher) =
Source.actorRef[S](bufferSize, OverflowStrategy.fail)
.toMat(Sink.asPublisher(true))(Keep.both)
.run()
ref ! seed
Source.fromPublisher(publisher)
.via(flow)
.map{x =>
loop(x).foreach{ c =>
remaining.incrementAndGet()
ref ! c
}
x
}
.takeWhile(_ => remaining.decrementAndGet > 0)
}
编辑: 我添加了一个 git 仓库来测试你的解决方案 https://github.com/MasseGuillaume/source-unfold
未完成原因
我认为流从未完成的原因不是 "using a flow with a buffer"。与 类似的实际原因是,使用默认参数 eagerClose=False
的合并正在等待 source
和 buffer
在它之前完成(合并)完成。但是缓冲区正在等待合并完成。所以合并正在等待缓冲区,缓冲区正在等待合并。
eagerClose 合并
您可以在创建合并时设置 eagerClose=True
。但是不幸的是,使用 eager close 可能会导致一些子 ItemId
值永远不会被查询。
间接解
如果您为树的每个级别具体化一个新流,则可以将递归提取到流之外。
您可以使用 itemFlow
:
构建查询函数
val itemQuery : Iterable[ItemId] => Future[Seq[Data]] =
(itemIds) => Source.apply(itemIds)
.via(itemFlow)
.runWith(Sink.seq[Data])
现在可以将此查询函数包装在递归辅助函数中:
val recQuery : (Iterable[ItemId], Iterable[Data]) => Future[Seq[Data]] =
(itemIds, currentData) => itemQuery(itemIds) flatMap { allNewData =>
val allNewKids = allNewData.flatMap(_.kids).toSet
if(allNewKids.isEmpty)
Future.successful(currentData ++ allNewData)
else
recQuery(allNewKids, currentData ++ data)
}
创建的流数将等于树的最大深度。
不幸的是,因为涉及到 Futures,这个递归函数 不是尾递归的 ,如果树太深,可能会导致 "stack overflow"。
啊,Akka 流中循环的乐趣。我有一个非常相似的问题,我以一种非常老套的方式解决了。说不定对你有帮助。
Hacky 解决方案:
// add a graph stage that will complete successfully if it sees no element within 5 seconds
val timedStopper = b.add(
Flow[Item]
.idleTimeout(5.seconds)
.recoverWithRetries(1, {
case _: TimeoutException => Source.empty[Item]
}))
source ~> merge ~> fetch ~> timedStopper ~> bcast ~> data
merge <~ buffer <~ kids <~ bcast
这样做是在最后一个元素通过 timedStopper
阶段 5 秒后,该阶段成功完成流。这是通过使用 idleTimeout
来实现的,它使用 TimeoutException
使流失败,然后使用 recoverWithRetries
将失败变成成功完成。 (我确实提到过它很老套)。
如果元素之间的间隔可能超过 5 秒,或者如果您无法承受流 "actually" 完成和 Akka 接收流之间的长时间等待,这显然不适合。值得庆幸的是,我们都没有担心,在那种情况下它实际上工作得很好!
非hacky解决方案
不幸的是,我能想到的不通过超时作弊的唯一方法非常非常复杂。
基本上,您需要能够跟踪两件事:
- 缓冲区中是否还有任何元素,或者正在发送到缓冲区的过程中
- 是否打开传入源
当且仅当两个问题的答案都是否时才完成流。原生 Akka 构建块可能无法处理此问题。但是,自定义图形阶段可能。一个选项可能是编写一个替代 Merge
的位置,并为其提供某种了解缓冲区内容的方式,或者可能让它跟踪它接收到的 ID 和广播发送到缓冲区的 ID。问题是自定义图阶段在最好的时候写起来并不是特别愉快,更不用说当你像这样跨阶段混合逻辑时了。
警告
Akka 流不能很好地处理循环,尤其是它们计算完成的方式。因此,这可能不是您遇到的唯一问题。
例如,我们在非常相似的结构中遇到的一个问题是源中的故障被视为流成功完成,并且成功 Future
被具体化。问题是,默认情况下,失败的阶段将使其下游失败,但 取消 其上游(这被视为这些阶段的成功完成)。对于像您所拥有的那样的循环,结果是一场比赛,因为取消传播到一个分支,但失败传播到另一个分支。您还需要检查如果接收器出错会发生什么;根据广播的取消设置,取消可能不会向上传播并且源会愉快地继续拉入元素。
最后一个选项:完全避免使用流处理递归逻辑。在一个极端情况下,如果您有任何方法可以编写一个单一的尾递归方法来一次提取所有嵌套项并将其放入 Flow 阶段,那将解决您的问题。另一方面,我们正在认真考虑去 Kafka 为我们自己的系统排队。
我通过编写自己的 GraphStage 解决了这个问题。
import akka.NotUsed
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.stage.{GraphStage, GraphStageLogic, OutHandler}
import scala.concurrent.ExecutionContext
import scala.collection.mutable
import scala.util.{Success, Failure, Try}
import scala.collection.mutable
def unfoldTree[S, E](seeds: List[S],
flow: Flow[S, E, NotUsed],
loop: E => List[S],
bufferSize: Int)(implicit ec: ExecutionContext): Source[E, NotUsed] = {
Source.fromGraph(new UnfoldSource(seeds, flow, loop, bufferSize))
}
object UnfoldSource {
implicit class MutableQueueExtensions[A](private val self: mutable.Queue[A]) extends AnyVal {
def dequeueN(n: Int): List[A] = {
val b = List.newBuilder[A]
var i = 0
while (i < n) {
val e = self.dequeue
b += e
i += 1
}
b.result()
}
}
}
class UnfoldSource[S, E](seeds: List[S],
flow: Flow[S, E, NotUsed],
loop: E => List[S],
bufferSize: Int)(implicit ec: ExecutionContext) extends GraphStage[SourceShape[E]] {
val out: Outlet[E] = Outlet("UnfoldSource.out")
override val shape: SourceShape[E] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler {
// Nodes to expand
val frontier = mutable.Queue[S]()
frontier ++= seeds
// Nodes expanded
val buffer = mutable.Queue[E]()
// Using the flow to fetch more data
var inFlight = false
// Sink pulled but the buffer was empty
var downstreamWaiting = false
def isBufferFull() = buffer.size >= bufferSize
def fillBuffer(): Unit = {
val batchSize = Math.min(bufferSize - buffer.size, frontier.size)
val batch = frontier.dequeueN(batchSize)
inFlight = true
val toProcess =
Source(batch)
.via(flow)
.runWith(Sink.seq)(materializer)
val callback = getAsyncCallback[Try[Seq[E]]]{
case Failure(ex) => {
fail(out, ex)
}
case Success(es) => {
val got = es.size
inFlight = false
es.foreach{ e =>
buffer += e
frontier ++= loop(e)
}
if (downstreamWaiting && buffer.nonEmpty) {
val e = buffer.dequeue
downstreamWaiting = false
sendOne(e)
} else {
checkCompletion()
}
()
}
}
toProcess.onComplete(callback.invoke)
}
override def preStart(): Unit = {
checkCompletion()
}
def checkCompletion(): Unit = {
if (!inFlight && buffer.isEmpty && frontier.isEmpty) {
completeStage()
}
}
def sendOne(e: E): Unit = {
push(out, e)
checkCompletion()
}
def onPull(): Unit = {
if (buffer.nonEmpty) {
sendOne(buffer.dequeue)
} else {
downstreamWaiting = true
}
if (!isBufferFull && frontier.nonEmpty) {
fillBuffer()
}
}
setHandler(out, this)
}
}
我需要遍历一个形状像树的API。例如,目录结构或讨论主题。它可以通过以下流程建模:
type ItemId = Int
type Data = String
case class Item(data: Data, kids: List[ItemId])
def randomData(): Data = scala.util.Random.alphanumeric.take(2).mkString
// 0 => [1, 9]
// 1 => [10, 19]
// 2 => [20, 29]
// ...
// 9 => [90, 99]
// _ => []
// NB. I don't have access to this function, only the itemFlow.
def nested(id: ItemId): List[ItemId] =
if (id == 0) (1 to 9).toList
else if (1 <= id && id <= 9) ((id * 10) to ((id + 1) * 10 - 1)).toList
else Nil
val itemFlow: Flow[ItemId, Item, NotUsed] =
Flow.fromFunction(id => Item(randomData, nested(id)))
如何遍历这些数据?我得到了以下工作:
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._
import scala.concurrent.Await
import scala.concurrent.duration.Duration
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
val loop =
GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val source = b.add(Flow[Int])
val merge = b.add(Merge[Int](2))
val fetch = b.add(itemFlow)
val bcast = b.add(Broadcast[Item](2))
val kids = b.add(Flow[Item].mapConcat(_.kids))
val data = b.add(Flow[Item].map(_.data))
val buffer = Flow[Int].buffer(100, OverflowStrategy.dropHead)
source ~> merge ~> fetch ~> bcast ~> data
merge <~ buffer <~ kids <~ bcast
FlowShape(source.in, data.out)
}
val flow = Flow.fromGraph(loop)
Await.result(
Source.single(0).via(flow).runWith(Sink.foreach(println)),
Duration.Inf
)
system.terminate()
但是,由于我使用的是带缓冲区的流,因此流永远不会完成。
Completes when upstream completes and buffered elements have been drained
我多次阅读 Graph cycles, liveness, and deadlocks 部分,但我仍在努力寻找答案。
这将创建一个活锁:
import java.util.concurrent.atomic.AtomicInteger
def unfold[S, E](seed: S, flow: Flow[S, E, NotUsed])(loop: E => List[S]): Source[E, NotUsed] = {
// keep track of how many element flows,
val remaining = new AtomicInteger(1) // 1 = seed
// should be > max loop(x)
val bufferSize = 10000
val (ref, publisher) =
Source.actorRef[S](bufferSize, OverflowStrategy.fail)
.toMat(Sink.asPublisher(true))(Keep.both)
.run()
ref ! seed
Source.fromPublisher(publisher)
.via(flow)
.map{x =>
loop(x).foreach{ c =>
remaining.incrementAndGet()
ref ! c
}
x
}
.takeWhile(_ => remaining.decrementAndGet > 0)
}
编辑: 我添加了一个 git 仓库来测试你的解决方案 https://github.com/MasseGuillaume/source-unfold
未完成原因
我认为流从未完成的原因不是 "using a flow with a buffer"。与 eagerClose=False
的合并正在等待 source
和 buffer
在它之前完成(合并)完成。但是缓冲区正在等待合并完成。所以合并正在等待缓冲区,缓冲区正在等待合并。
eagerClose 合并
您可以在创建合并时设置 eagerClose=True
。但是不幸的是,使用 eager close 可能会导致一些子 ItemId
值永远不会被查询。
间接解
如果您为树的每个级别具体化一个新流,则可以将递归提取到流之外。
您可以使用 itemFlow
:
val itemQuery : Iterable[ItemId] => Future[Seq[Data]] =
(itemIds) => Source.apply(itemIds)
.via(itemFlow)
.runWith(Sink.seq[Data])
现在可以将此查询函数包装在递归辅助函数中:
val recQuery : (Iterable[ItemId], Iterable[Data]) => Future[Seq[Data]] =
(itemIds, currentData) => itemQuery(itemIds) flatMap { allNewData =>
val allNewKids = allNewData.flatMap(_.kids).toSet
if(allNewKids.isEmpty)
Future.successful(currentData ++ allNewData)
else
recQuery(allNewKids, currentData ++ data)
}
创建的流数将等于树的最大深度。
不幸的是,因为涉及到 Futures,这个递归函数 不是尾递归的 ,如果树太深,可能会导致 "stack overflow"。
啊,Akka 流中循环的乐趣。我有一个非常相似的问题,我以一种非常老套的方式解决了。说不定对你有帮助。
Hacky 解决方案:
// add a graph stage that will complete successfully if it sees no element within 5 seconds
val timedStopper = b.add(
Flow[Item]
.idleTimeout(5.seconds)
.recoverWithRetries(1, {
case _: TimeoutException => Source.empty[Item]
}))
source ~> merge ~> fetch ~> timedStopper ~> bcast ~> data
merge <~ buffer <~ kids <~ bcast
这样做是在最后一个元素通过 timedStopper
阶段 5 秒后,该阶段成功完成流。这是通过使用 idleTimeout
来实现的,它使用 TimeoutException
使流失败,然后使用 recoverWithRetries
将失败变成成功完成。 (我确实提到过它很老套)。
如果元素之间的间隔可能超过 5 秒,或者如果您无法承受流 "actually" 完成和 Akka 接收流之间的长时间等待,这显然不适合。值得庆幸的是,我们都没有担心,在那种情况下它实际上工作得很好!
非hacky解决方案
不幸的是,我能想到的不通过超时作弊的唯一方法非常非常复杂。
基本上,您需要能够跟踪两件事:
- 缓冲区中是否还有任何元素,或者正在发送到缓冲区的过程中
- 是否打开传入源
当且仅当两个问题的答案都是否时才完成流。原生 Akka 构建块可能无法处理此问题。但是,自定义图形阶段可能。一个选项可能是编写一个替代 Merge
的位置,并为其提供某种了解缓冲区内容的方式,或者可能让它跟踪它接收到的 ID 和广播发送到缓冲区的 ID。问题是自定义图阶段在最好的时候写起来并不是特别愉快,更不用说当你像这样跨阶段混合逻辑时了。
警告
Akka 流不能很好地处理循环,尤其是它们计算完成的方式。因此,这可能不是您遇到的唯一问题。
例如,我们在非常相似的结构中遇到的一个问题是源中的故障被视为流成功完成,并且成功 Future
被具体化。问题是,默认情况下,失败的阶段将使其下游失败,但 取消 其上游(这被视为这些阶段的成功完成)。对于像您所拥有的那样的循环,结果是一场比赛,因为取消传播到一个分支,但失败传播到另一个分支。您还需要检查如果接收器出错会发生什么;根据广播的取消设置,取消可能不会向上传播并且源会愉快地继续拉入元素。
最后一个选项:完全避免使用流处理递归逻辑。在一个极端情况下,如果您有任何方法可以编写一个单一的尾递归方法来一次提取所有嵌套项并将其放入 Flow 阶段,那将解决您的问题。另一方面,我们正在认真考虑去 Kafka 为我们自己的系统排队。
我通过编写自己的 GraphStage 解决了这个问题。
import akka.NotUsed
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.stage.{GraphStage, GraphStageLogic, OutHandler}
import scala.concurrent.ExecutionContext
import scala.collection.mutable
import scala.util.{Success, Failure, Try}
import scala.collection.mutable
def unfoldTree[S, E](seeds: List[S],
flow: Flow[S, E, NotUsed],
loop: E => List[S],
bufferSize: Int)(implicit ec: ExecutionContext): Source[E, NotUsed] = {
Source.fromGraph(new UnfoldSource(seeds, flow, loop, bufferSize))
}
object UnfoldSource {
implicit class MutableQueueExtensions[A](private val self: mutable.Queue[A]) extends AnyVal {
def dequeueN(n: Int): List[A] = {
val b = List.newBuilder[A]
var i = 0
while (i < n) {
val e = self.dequeue
b += e
i += 1
}
b.result()
}
}
}
class UnfoldSource[S, E](seeds: List[S],
flow: Flow[S, E, NotUsed],
loop: E => List[S],
bufferSize: Int)(implicit ec: ExecutionContext) extends GraphStage[SourceShape[E]] {
val out: Outlet[E] = Outlet("UnfoldSource.out")
override val shape: SourceShape[E] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler {
// Nodes to expand
val frontier = mutable.Queue[S]()
frontier ++= seeds
// Nodes expanded
val buffer = mutable.Queue[E]()
// Using the flow to fetch more data
var inFlight = false
// Sink pulled but the buffer was empty
var downstreamWaiting = false
def isBufferFull() = buffer.size >= bufferSize
def fillBuffer(): Unit = {
val batchSize = Math.min(bufferSize - buffer.size, frontier.size)
val batch = frontier.dequeueN(batchSize)
inFlight = true
val toProcess =
Source(batch)
.via(flow)
.runWith(Sink.seq)(materializer)
val callback = getAsyncCallback[Try[Seq[E]]]{
case Failure(ex) => {
fail(out, ex)
}
case Success(es) => {
val got = es.size
inFlight = false
es.foreach{ e =>
buffer += e
frontier ++= loop(e)
}
if (downstreamWaiting && buffer.nonEmpty) {
val e = buffer.dequeue
downstreamWaiting = false
sendOne(e)
} else {
checkCompletion()
}
()
}
}
toProcess.onComplete(callback.invoke)
}
override def preStart(): Unit = {
checkCompletion()
}
def checkCompletion(): Unit = {
if (!inFlight && buffer.isEmpty && frontier.isEmpty) {
completeStage()
}
}
def sendOne(e: E): Unit = {
push(out, e)
checkCompletion()
}
def onPull(): Unit = {
if (buffer.nonEmpty) {
sendOne(buffer.dequeue)
} else {
downstreamWaiting = true
}
if (!isBufferFull && frontier.nonEmpty) {
fillBuffer()
}
}
setHandler(out, this)
}
}