Akka Streams:不在 onPull(_,_) 内调用 push(_,_) 会阻塞流 - 为什么?
Akka Streams: Calling push(_,_) not within onPull(_,_) is blocking the stream - Why?
我无法理解小样本客户 Akka Streams Source 的行为。
示例背后的想法是 Source 应该向 Actor 询问下一个元素。见下方代码
class ActorSource[T](context: ActorRefFactory, actor: ActorRef) extends GraphStage[SourceShape[T]] {
val out: Outlet[T] = Outlet("actor-source")
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
new GraphStageLogic(shape) {
setHandler(out, new OutHandler {
val receivingActor = context.actorOf(Props(new ReceivingActor(msg => {
push(out, msg)
println("push - Done")
})))
override def onPull(): Unit = {
actor ! Protocol.Pull(receivingActor)
println("onPull - Done")
}
})
}
}
override def shape: SourceShape[T] = SourceShape(out)
/**
* A small actor which receives new elements from the actual source actor.
*
* @param push The method to push elements into the stream
*/
class ReceivingActor(push: T => Unit) extends Actor with ActorLogging with UnknownMessage {
override def receive: Receive = {
case Protocol.Push(msg) =>
push(msg.asInstanceOf[T]) // I know that this is evil ....just for test in that case...
case msg =>
unknownMessage(msg)
}
}
}
object ActorSource {
/**
* Creates an [[ActorSource]]
*
* @param actor The actor which acts as the source
* @param context The context to create the internal helper actor
* @return A new akka-streams source
*/
def Source[T](actor: ActorRef)(implicit context: ActorRefFactory): AkkaSource[T, NotUsed] = {
val graph: Graph[SourceShape[T], NotUsed] = new ActorSource[T](context, actor)
AkkaSource.fromGraph(graph)
}
/**
* Defines the messages/ events for the source actor
*/
object Protocol {
/**
* Will be sent if the stream requires new elements.
*
* @param actor The actor which should receive the push message
*/
case class Pull(actor: ActorRef)
/**
* Sent by the source actor to submit a new element.
*
* @param msg The message to put into the stream.
*/
case class Push(msg: Any)
}
}
如果您像这样使用该源创建流:
class SampleActor extends Actor with ActorLogging with UnknownMessage {
var counter = 0
override def receive: Receive = {
case msg @ Protocol.Pull(actor) =>
actor ! Protocol.Push(counter)
counter = counter + 1
}
}
val sourceActor = system.actorOf(Props(new SampleActor()))
val stream = ActorSource
.Source[Int](sourceActor)(system)
.take(10)
.runWith(Sink.foreach(println))
Await.result(stream, 30 seconds)
输出结果仅如下:
onPull - Done
push - Done
第一个整数永远不会到达 Sink,并且 onPull
不再被调用。有趣的是,如果我终止程序,第一个整数将打印在接收器中。
我想知道这是功能还是错误?根据我的理解,可以在 pull
发出插座打开的信号后随时调用 push(_, _)
,即使我要求 isAvailable
它 returns true
.
谁能解释一下这种行为?
阅读更多文档后 (RTFM ;)) 我想我找到了问题的答案。
在多个地方都提到在相关回调之外调用这些 API 方法是不安全的。为了实现我要实现的目标,Akka Streams 提供了一个 getAsyncCallback
方法。
可以在此处找到更多详细信息:https://doc.akka.io/docs/akka/2.5.4/scala/stream/stream-customize.html#using-asynchronous-side-channels。
我无法理解小样本客户 Akka Streams Source 的行为。
示例背后的想法是 Source 应该向 Actor 询问下一个元素。见下方代码
class ActorSource[T](context: ActorRefFactory, actor: ActorRef) extends GraphStage[SourceShape[T]] {
val out: Outlet[T] = Outlet("actor-source")
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
new GraphStageLogic(shape) {
setHandler(out, new OutHandler {
val receivingActor = context.actorOf(Props(new ReceivingActor(msg => {
push(out, msg)
println("push - Done")
})))
override def onPull(): Unit = {
actor ! Protocol.Pull(receivingActor)
println("onPull - Done")
}
})
}
}
override def shape: SourceShape[T] = SourceShape(out)
/**
* A small actor which receives new elements from the actual source actor.
*
* @param push The method to push elements into the stream
*/
class ReceivingActor(push: T => Unit) extends Actor with ActorLogging with UnknownMessage {
override def receive: Receive = {
case Protocol.Push(msg) =>
push(msg.asInstanceOf[T]) // I know that this is evil ....just for test in that case...
case msg =>
unknownMessage(msg)
}
}
}
object ActorSource {
/**
* Creates an [[ActorSource]]
*
* @param actor The actor which acts as the source
* @param context The context to create the internal helper actor
* @return A new akka-streams source
*/
def Source[T](actor: ActorRef)(implicit context: ActorRefFactory): AkkaSource[T, NotUsed] = {
val graph: Graph[SourceShape[T], NotUsed] = new ActorSource[T](context, actor)
AkkaSource.fromGraph(graph)
}
/**
* Defines the messages/ events for the source actor
*/
object Protocol {
/**
* Will be sent if the stream requires new elements.
*
* @param actor The actor which should receive the push message
*/
case class Pull(actor: ActorRef)
/**
* Sent by the source actor to submit a new element.
*
* @param msg The message to put into the stream.
*/
case class Push(msg: Any)
}
}
如果您像这样使用该源创建流:
class SampleActor extends Actor with ActorLogging with UnknownMessage {
var counter = 0
override def receive: Receive = {
case msg @ Protocol.Pull(actor) =>
actor ! Protocol.Push(counter)
counter = counter + 1
}
}
val sourceActor = system.actorOf(Props(new SampleActor()))
val stream = ActorSource
.Source[Int](sourceActor)(system)
.take(10)
.runWith(Sink.foreach(println))
Await.result(stream, 30 seconds)
输出结果仅如下:
onPull - Done
push - Done
第一个整数永远不会到达 Sink,并且 onPull
不再被调用。有趣的是,如果我终止程序,第一个整数将打印在接收器中。
我想知道这是功能还是错误?根据我的理解,可以在 pull
发出插座打开的信号后随时调用 push(_, _)
,即使我要求 isAvailable
它 returns true
.
谁能解释一下这种行为?
阅读更多文档后 (RTFM ;)) 我想我找到了问题的答案。
在多个地方都提到在相关回调之外调用这些 API 方法是不安全的。为了实现我要实现的目标,Akka Streams 提供了一个 getAsyncCallback
方法。
可以在此处找到更多详细信息:https://doc.akka.io/docs/akka/2.5.4/scala/stream/stream-customize.html#using-asynchronous-side-channels。