使用 akka 流询问模式不起作用
Ask pattern not working using akka streams
我尝试实现一个与 tutorial 略有不同的示例:
object Test extends App {
class A extends Actor {
override def receive: Receive = {
case 10 => context.system.terminate()
case x =>
println(s"Received: $x")
sender() ! x
}
}
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val dispatcher = system.dispatcher
implicit val askTimeout = Timeout(1, TimeUnit.SECONDS)
val a = system.actorOf(Props[A])
val graph = RunnableGraph.fromGraph(GraphDSL.create() {
{
implicit builder: GraphDSL.Builder[NotUsed] => {
import GraphDSL.Implicits._
val source: Source[Int, NotUsed] = Source(1 to 10)
source ~> Flow[Int].ask(2)(a) ~> Sink.foreach[String](println)
ClosedShape
}
}
})
val x = graph.run()
}
结果我得到这个:
Received: 1
Received: 2
但我希望这样:
Received: 1
1
Received: 2
2
...
Received: 9
9
如果我通过 Source.actorRef
向演员发送消息,那么一切正常,演员会收到所有消息。看起来物化演员正在等待引用演员的回应。
如何解决?
如果将 Sink.foreach[String](println)
替换为 Sink.onComplete(println)
您会看到流程因错误而终止:
Received: 1
Received: 2
Failure(java.lang.ClassCastException: Cannot cast java.lang.Integer to scala.runtime.Nothing$)
问题是 Source.ask 需要知道什么 class 才能将演员的回复映射到 see API docs。否则它将尝试将回复转换为 Nothing 并在内部抛出异常。
要修复您的原始代码,请将 ask 替换为 ask[Int]:
val graph = RunnableGraph.fromGraph(GraphDSL.create() {
{
implicit builder: GraphDSL.Builder[NotUsed] => {
import GraphDSL.Implicits._
val source: Source[Int, NotUsed] = Source(1 to 10)
source ~> Flow[Int].ask[Int](2)(a) ~> Sink.foreach(println)
ClosedShape
}
}
})
我尝试实现一个与 tutorial 略有不同的示例:
object Test extends App {
class A extends Actor {
override def receive: Receive = {
case 10 => context.system.terminate()
case x =>
println(s"Received: $x")
sender() ! x
}
}
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val dispatcher = system.dispatcher
implicit val askTimeout = Timeout(1, TimeUnit.SECONDS)
val a = system.actorOf(Props[A])
val graph = RunnableGraph.fromGraph(GraphDSL.create() {
{
implicit builder: GraphDSL.Builder[NotUsed] => {
import GraphDSL.Implicits._
val source: Source[Int, NotUsed] = Source(1 to 10)
source ~> Flow[Int].ask(2)(a) ~> Sink.foreach[String](println)
ClosedShape
}
}
})
val x = graph.run()
}
结果我得到这个:
Received: 1
Received: 2
但我希望这样:
Received: 1
1
Received: 2
2
...
Received: 9
9
如果我通过 Source.actorRef
向演员发送消息,那么一切正常,演员会收到所有消息。看起来物化演员正在等待引用演员的回应。
如何解决?
如果将 Sink.foreach[String](println)
替换为 Sink.onComplete(println)
您会看到流程因错误而终止:
Received: 1
Received: 2
Failure(java.lang.ClassCastException: Cannot cast java.lang.Integer to scala.runtime.Nothing$)
问题是 Source.ask 需要知道什么 class 才能将演员的回复映射到 see API docs。否则它将尝试将回复转换为 Nothing 并在内部抛出异常。
要修复您的原始代码,请将 ask 替换为 ask[Int]:
val graph = RunnableGraph.fromGraph(GraphDSL.create() {
{
implicit builder: GraphDSL.Builder[NotUsed] => {
import GraphDSL.Implicits._
val source: Source[Int, NotUsed] = Source(1 to 10)
source ~> Flow[Int].ask[Int](2)(a) ~> Sink.foreach(println)
ClosedShape
}
}
})