Akka stream to actor 问题

Akka stream to actor issue

我正在尝试使用 Sink.actorRefWithAck 将 http 响应流式传输到一个 actor,但不知何故,除了 init 和 complete 消息外,没有任何东西发送到这个 sink actor。不过,我可能遗漏了一些非常明显的东西。 Sink actor 正在启动中。这是流媒体演员的代码:

override def receive: Receive = {
    case GetTestData(p, id) =>
      // Get the data and pipes it to itself through a message as recommended
  
      http.singleRequest(HttpRequest(uri = uri.format(p, id)))
        .pipeTo(self)

    case HttpResponse(StatusCodes.OK, _, entity, _) =>
      // Sink as actorRef
      val sink = Sink.actorRefWithAck(context.actorOf(TestJob2.props(), "testJob2Actor"), Init, Ack, Complete)
      // Pipe the response body to the actor sink
      entity.dataBytes.runWith(sink)

    case resp @ HttpResponse(code, _, _, _) =>
      log.error("Request to test job failed, response code: " + code)
      // Discard the flow to avoid backpressure
      resp.discardEntityBytes()

    case _ => log.warning("Unexpected message in TestJobActor")
}

这里是接收器 actor 的代码:

override def receive: Receive = {
  case Init =>
    log.info("TestJob2Actor got init sink message")
    sender ! Ack

  case Complete => log.info("TestJob2Actor got complete sink message")

  case b: ByteString =>
    log.debug(b.utf8String)
    sender ! Ack

  case _ => log.warning("Unexpected message in TestJob2Actor")
}

输出:

2018-01-25 17:26:58,530 INFO com.mcma.actors.Supervisor akka.tcp://alor-system@10.33.135.82:8000/user/supervisorActor - SupervisorActor forwarded GetTestData message to TestJobActor

2018-01-25 17:26:59,243 INFO com.mcma.actors.jobs.TestJob akka.tcp://alor-system@10.33.135.82:8000/user/supervisorActor/testJobActor - TestJob actor started

2018-01-25 17:27:00,052 INFO com.mcma.actors.jobs.TestJob2 akka.tcp://alor-system@10.33.135.82:8000/user/supervisorActor/testJobActor/testJob2Actor - TestJob2 actor started

2018-01-25 17:27:00,067 INFO com.mcma.actors.jobs.TestJob2 akka.tcp://alor-system@10.33.135.82:8000/user/supervisorActor/testJobActor/testJob2Actor - TestJob2Actor got init sink message

2018-01-25 17:27:00,083 INFO com.mcma.actors.jobs.TestJob2 akka.tcp://alor-system@10.33.135.82:8000/user/supervisorActor/testJobActor/testJob2Actor - TestJob2Actor got complete sink message

想法:(1) 来自 entity.dataBytesSource 可能是空的,或者 (2) sink actor 实际上正在处理 ByteString 块,但是日志级别是这样的调试消息不可见。尝试将日志记录级别设置为 DEBUG,或将 log.debug(b.utf8String) 更改为 log.info(b.utf8String)