使用 akka 流询问模式不起作用

Ask pattern not working using akka streams

我尝试实现一个与 tutorial 略有不同的示例:

object Test extends App {
      class A extends Actor {
        override def receive: Receive = {
          case 10 => context.system.terminate()
          case x =>
            println(s"Received: $x")
            sender() ! x
        }
      }

      implicit val system = ActorSystem()
      implicit val materializer = ActorMaterializer()
      implicit val dispatcher = system.dispatcher
      implicit val askTimeout = Timeout(1, TimeUnit.SECONDS)

      val a = system.actorOf(Props[A])

      val graph = RunnableGraph.fromGraph(GraphDSL.create() {
        {
          implicit builder: GraphDSL.Builder[NotUsed] => {
            import GraphDSL.Implicits._

            val source: Source[Int, NotUsed] = Source(1 to 10)
            source ~> Flow[Int].ask(2)(a) ~> Sink.foreach[String](println)
            ClosedShape
          }
        }
      })

      val x = graph.run()
}

结果我得到这个:

Received: 1

Received: 2

但我希望这样:

Received: 1

1

Received: 2

2

...

Received: 9

9

如果我通过 Source.actorRef 向演员发送消息,那么一切正常,演员会收到所有消息。看起来物化演员正在等待引用演员的回应。 如何解决?

如果将 Sink.foreach[String](println) 替换为 Sink.onComplete(println) 您会看到流程因错误而终止:

Received: 1
Received: 2
Failure(java.lang.ClassCastException: Cannot cast java.lang.Integer to scala.runtime.Nothing$)

问题是 Source.ask 需要知道什么 class 才能将演员的回复映射到 see API docs。否则它将尝试将回复转换为 Nothing 并在内部抛出异常。

要修复您的原始代码,请将 ask 替换为 ask[Int]:

  val graph = RunnableGraph.fromGraph(GraphDSL.create() {
    {
      implicit builder: GraphDSL.Builder[NotUsed] => {
        import GraphDSL.Implicits._

        val source: Source[Int, NotUsed] = Source(1 to 10)
        source ~> Flow[Int].ask[Int](2)(a) ~> Sink.foreach(println)
        ClosedShape
      }
    }
  })