Akka stream to actor 问题
Akka stream to actor issue
我正在尝试使用 Sink.actorRefWithAck 将 http 响应流式传输到一个 actor,但不知何故,除了 init 和 complete 消息外,没有任何东西发送到这个 sink actor。不过,我可能遗漏了一些非常明显的东西。
Sink actor 正在启动中。这是流媒体演员的代码:
override def receive: Receive = {
case GetTestData(p, id) =>
// Get the data and pipes it to itself through a message as recommended
http.singleRequest(HttpRequest(uri = uri.format(p, id)))
.pipeTo(self)
case HttpResponse(StatusCodes.OK, _, entity, _) =>
// Sink as actorRef
val sink = Sink.actorRefWithAck(context.actorOf(TestJob2.props(), "testJob2Actor"), Init, Ack, Complete)
// Pipe the response body to the actor sink
entity.dataBytes.runWith(sink)
case resp @ HttpResponse(code, _, _, _) =>
log.error("Request to test job failed, response code: " + code)
// Discard the flow to avoid backpressure
resp.discardEntityBytes()
case _ => log.warning("Unexpected message in TestJobActor")
}
这里是接收器 actor 的代码:
override def receive: Receive = {
case Init =>
log.info("TestJob2Actor got init sink message")
sender ! Ack
case Complete => log.info("TestJob2Actor got complete sink message")
case b: ByteString =>
log.debug(b.utf8String)
sender ! Ack
case _ => log.warning("Unexpected message in TestJob2Actor")
}
输出:
2018-01-25 17:26:58,530 INFO com.mcma.actors.Supervisor akka.tcp://alor-system@10.33.135.82:8000/user/supervisorActor - SupervisorActor forwarded GetTestData message to TestJobActor
2018-01-25 17:26:59,243 INFO com.mcma.actors.jobs.TestJob akka.tcp://alor-system@10.33.135.82:8000/user/supervisorActor/testJobActor - TestJob actor started
2018-01-25 17:27:00,052 INFO com.mcma.actors.jobs.TestJob2 akka.tcp://alor-system@10.33.135.82:8000/user/supervisorActor/testJobActor/testJob2Actor - TestJob2 actor started
2018-01-25 17:27:00,067 INFO com.mcma.actors.jobs.TestJob2 akka.tcp://alor-system@10.33.135.82:8000/user/supervisorActor/testJobActor/testJob2Actor - TestJob2Actor got init sink message
2018-01-25 17:27:00,083 INFO com.mcma.actors.jobs.TestJob2 akka.tcp://alor-system@10.33.135.82:8000/user/supervisorActor/testJobActor/testJob2Actor - TestJob2Actor got complete sink message
想法:(1) 来自 entity.dataBytes
的 Source
可能是空的,或者 (2) sink actor 实际上正在处理 ByteString
块,但是日志级别是这样的调试消息不可见。尝试将日志记录级别设置为 DEBUG,或将 log.debug(b.utf8String)
更改为 log.info(b.utf8String)
。
我正在尝试使用 Sink.actorRefWithAck 将 http 响应流式传输到一个 actor,但不知何故,除了 init 和 complete 消息外,没有任何东西发送到这个 sink actor。不过,我可能遗漏了一些非常明显的东西。 Sink actor 正在启动中。这是流媒体演员的代码:
override def receive: Receive = {
case GetTestData(p, id) =>
// Get the data and pipes it to itself through a message as recommended
http.singleRequest(HttpRequest(uri = uri.format(p, id)))
.pipeTo(self)
case HttpResponse(StatusCodes.OK, _, entity, _) =>
// Sink as actorRef
val sink = Sink.actorRefWithAck(context.actorOf(TestJob2.props(), "testJob2Actor"), Init, Ack, Complete)
// Pipe the response body to the actor sink
entity.dataBytes.runWith(sink)
case resp @ HttpResponse(code, _, _, _) =>
log.error("Request to test job failed, response code: " + code)
// Discard the flow to avoid backpressure
resp.discardEntityBytes()
case _ => log.warning("Unexpected message in TestJobActor")
}
这里是接收器 actor 的代码:
override def receive: Receive = {
case Init =>
log.info("TestJob2Actor got init sink message")
sender ! Ack
case Complete => log.info("TestJob2Actor got complete sink message")
case b: ByteString =>
log.debug(b.utf8String)
sender ! Ack
case _ => log.warning("Unexpected message in TestJob2Actor")
}
输出:
2018-01-25 17:26:58,530 INFO com.mcma.actors.Supervisor akka.tcp://alor-system@10.33.135.82:8000/user/supervisorActor - SupervisorActor forwarded GetTestData message to TestJobActor
2018-01-25 17:26:59,243 INFO com.mcma.actors.jobs.TestJob akka.tcp://alor-system@10.33.135.82:8000/user/supervisorActor/testJobActor - TestJob actor started
2018-01-25 17:27:00,052 INFO com.mcma.actors.jobs.TestJob2 akka.tcp://alor-system@10.33.135.82:8000/user/supervisorActor/testJobActor/testJob2Actor - TestJob2 actor started
2018-01-25 17:27:00,067 INFO com.mcma.actors.jobs.TestJob2 akka.tcp://alor-system@10.33.135.82:8000/user/supervisorActor/testJobActor/testJob2Actor - TestJob2Actor got init sink message
2018-01-25 17:27:00,083 INFO com.mcma.actors.jobs.TestJob2 akka.tcp://alor-system@10.33.135.82:8000/user/supervisorActor/testJobActor/testJob2Actor - TestJob2Actor got complete sink message
想法:(1) 来自 entity.dataBytes
的 Source
可能是空的,或者 (2) sink actor 实际上正在处理 ByteString
块,但是日志级别是这样的调试消息不可见。尝试将日志记录级别设置为 DEBUG,或将 log.debug(b.utf8String)
更改为 log.info(b.utf8String)
。