使用 akka http 的网络套接字单元测试用例
Unit test cases for web sockets using akka http
我已经使用 Akka HTTP 实现了网络套接字。我正在使用来自 Kafka 的数据并使用网络套接字发送通知。功能运行良好,但 我被困在测试用例中。
` private def loginNotificationRoute(): Route = {
cors() {
pathPrefix("notifications" / Segment) { userName =>
get {
handleWebSocketMessages(notificationClientFlow(userName))
}
}
}
}`
Notification Actor 从数据库读取通知并将数据发送回客户端。
` def notificationClientFlow(userName: String): Flow[Message, Message, NotUsed] = {
info(s"Connection Request accepted for user $userName")
val notificationActor = actorSystem.actorOf(NotificationActor.props(userName, jsonHelper))
val incomingMessages: Sink[Message, NotUsed] =
Flow[Message].map {
case TextMessage.Strict(text) => NotificationActor.IncomingMessage(text)
}.to(Sink.actorRef(notificationActor, PoisonPill))
val outgoingMessages: Source[Message, NotUsed] =
Source
.actorRef[NotificationActor.ResponseType](BUFFER_SIZE, OverflowStrategy.fail)
.mapMaterializedValue { outgoingActor =>
notificationActor ! NotificationActor.Connected(outgoingActor)
NotUsed
}
.map {
notificationResponse: NotificationActor.ResponseType =>
info(s"sending notification ${notificationResponse.action}")
TextMessage.Strict(jsonHelper.write(notificationResponse))
}
Flow.fromSinkAndSource(incomingMessages, outgoingMessages)
}`
重新设计依赖关系
在我看来,您当前设计的最大缺点是存在对特定 actorSystem
的内在依赖性;有问题的代码行是
//actorSystem comes from the outside world!!!
val notificationActor = actorSystem.actorOf(NotificationActor.props(userName, jsonHelper))
notificationClientFlow
方法应该重新参数化,以便在参数中明确列出所有依赖项。并且不应该依赖整个 ActorSystem
,而应该减少到仅 ActorRef
:
def notificationClientFlow(userName: String,
notificationActor : ActorRef): Flow[Message, Message, NotUsed] = {
//notificationActor is now passed in
//val notificationActor = actorSystem.actorOf(NotificationActor.props(userName, jsonHelper))
}
这需要在 Route
创建方法中进行类似的显式声明:
private def loginNotificationRoute(notificationActor : ActorRef)() : Route = {
...
handleWebSocketMessages(notificationClientFlow(userName, notificationActor))
}
测试
现在可以使用TestKit
进行测试:
class MySpec() extends TestKit(ActorSystem("MySpec")) {
val userName = "testUser"
val notificationActor =
system.actorOf(NotificationActor.props(userName, jsonHelper))
val testFlow = notificationClientFlow(userName, notificationActor)
}
此外,因为我们明确地通过了 ActorRef
而不仅仅是 ActorSystem
,我们可以使用其他高级测试功能,例如 probes:
val probe = TestProbe()
val testProbeFlow = notificationClientFlow(userName, probe.ref)
上述技术同样适用于使用 standard techniques:
测试路由
val testRoute = loginNotificationRoute(testProbe)
Get() ~> testRoute() ~> check {
//testing assertions here
}
我已经使用 Akka HTTP 实现了网络套接字。我正在使用来自 Kafka 的数据并使用网络套接字发送通知。功能运行良好,但 我被困在测试用例中。
` private def loginNotificationRoute(): Route = {
cors() {
pathPrefix("notifications" / Segment) { userName =>
get {
handleWebSocketMessages(notificationClientFlow(userName))
}
}
}
}`
Notification Actor 从数据库读取通知并将数据发送回客户端。
` def notificationClientFlow(userName: String): Flow[Message, Message, NotUsed] = {
info(s"Connection Request accepted for user $userName")
val notificationActor = actorSystem.actorOf(NotificationActor.props(userName, jsonHelper))
val incomingMessages: Sink[Message, NotUsed] =
Flow[Message].map {
case TextMessage.Strict(text) => NotificationActor.IncomingMessage(text)
}.to(Sink.actorRef(notificationActor, PoisonPill))
val outgoingMessages: Source[Message, NotUsed] =
Source
.actorRef[NotificationActor.ResponseType](BUFFER_SIZE, OverflowStrategy.fail)
.mapMaterializedValue { outgoingActor =>
notificationActor ! NotificationActor.Connected(outgoingActor)
NotUsed
}
.map {
notificationResponse: NotificationActor.ResponseType =>
info(s"sending notification ${notificationResponse.action}")
TextMessage.Strict(jsonHelper.write(notificationResponse))
}
Flow.fromSinkAndSource(incomingMessages, outgoingMessages)
}`
重新设计依赖关系
在我看来,您当前设计的最大缺点是存在对特定 actorSystem
的内在依赖性;有问题的代码行是
//actorSystem comes from the outside world!!!
val notificationActor = actorSystem.actorOf(NotificationActor.props(userName, jsonHelper))
notificationClientFlow
方法应该重新参数化,以便在参数中明确列出所有依赖项。并且不应该依赖整个 ActorSystem
,而应该减少到仅 ActorRef
:
def notificationClientFlow(userName: String,
notificationActor : ActorRef): Flow[Message, Message, NotUsed] = {
//notificationActor is now passed in
//val notificationActor = actorSystem.actorOf(NotificationActor.props(userName, jsonHelper))
}
这需要在 Route
创建方法中进行类似的显式声明:
private def loginNotificationRoute(notificationActor : ActorRef)() : Route = {
...
handleWebSocketMessages(notificationClientFlow(userName, notificationActor))
}
测试
现在可以使用TestKit
进行测试:
class MySpec() extends TestKit(ActorSystem("MySpec")) {
val userName = "testUser"
val notificationActor =
system.actorOf(NotificationActor.props(userName, jsonHelper))
val testFlow = notificationClientFlow(userName, notificationActor)
}
此外,因为我们明确地通过了 ActorRef
而不仅仅是 ActorSystem
,我们可以使用其他高级测试功能,例如 probes:
val probe = TestProbe()
val testProbeFlow = notificationClientFlow(userName, probe.ref)
上述技术同样适用于使用 standard techniques:
测试路由val testRoute = loginNotificationRoute(testProbe)
Get() ~> testRoute() ~> check {
//testing assertions here
}