带有 Graph DSL 的 Websocket
Websocket with Graph DSL
我正在尝试使用 Akka Flow 实现 Websocket 登录流程。我收到无数关于入口、出口和连接问题的令人讨厌的运行时异常。
我的最新是:
java.lang.IllegalStateException: Illegal GraphDSL usage. Inlets [Map.in] were not returned in the resulting shape and not connected.
片段:
object Login {
def graph(system: ActorSystem, future: Future[LoginCommand.UserData], socketUrl: String) =
Source.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val in = Source.fromFuture(future)
in.named("LoginData")
val fanIn = Zip[LoginResponse, LoginCommand.UserData]
val exasolLogin = builder.add(Http(system).webSocketClientFlow(WebSocketRequest(socketUrl)))
val encryptLoginData = FlowShape(exasolLogin.in, fanIn.out)
val exasolAnnounce = Http(system).webSocketClientFlow(WebSocketRequest(socketUrl))
val announceLogin = Source.single(LoginCommand)
in -> fanIn
announceLogin -> exasolAnnounce -> fanIn
fanIn -> encryptLoginData -> exasolLogin
SourceShape(exasolLogin.out)
})
}
我可能完全错误地使用了 DSL,因为我还没有找到一篇深入解释图形、形状、流、物化值的文章。
有人可以指出我做错了什么或者应该怎么写吗?
编辑 1:
现已将 ->
替换为 ~>
并出现严重的编译错误:
object Login {
def graph(system: ActorSystem, future: Future[LoginCommand.UserData], socketUrl: String) =
Source.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val in = Source.fromFuture(future)
in.named("LoginData")
val fanIn = builder.add(Zip[LoginResponse, LoginCommand.UserData])
val exasolLogin = Http(system).webSocketClientFlow(WebSocketRequest(socketUrl))
val encryptLoginData = Flow[(LoginResponse, LoginCommand.UserData)].map(data => data._1)
val loginDataMessage = Flow[LoginCommand.UserData].map(data => TextMessage("bar"))
val exasolAnnounce = Http(system).webSocketClientFlow(WebSocketRequest(socketUrl))
val announceResponse = Flow[Message].map(data => LoginResponse("key", "mod", "exp"))
val loginMessage = Flow[LoginCommand].map(data => TextMessage("foo"))
val session = builder.add(Flow[Message].map(data => LoginCommand.SessionData(0, 1, "2", "db", "w", 59, 546, 45, "q", "TZ", "TZB")))
in ~> fanIn.in1
Source.single(LoginCommand) ~> loginMessage ~> exasolAnnounce ~> announceResponse ~> fanIn.in0
fanIn.out ~> encryptLoginData ~> loginDataMessage ~> exasolLogin ~> session
SourceShape(session.out)
})
}
这导致
exasol-client/LoginGraph.scala:42: error: overloaded method value ~> with alternatives:
(to: akka.stream.SinkShape[exasol.LoginCommand.type])(implicit b: akka.stream.scaladsl.GraphDSL.Builder[_])Unit <and>
(to: akka.stream.Graph[akka.stream.SinkShape[exasol.LoginCommand.type], _])(implicit b: akka.stream.scaladsl.GraphDSL.Builder[_])Unit <and>
[Out](flow: akka.stream.FlowShape[exasol.LoginCommand.type,Out])(implicit b: akka.stream.scaladsl.GraphDSL.Builder[_])akka.stream.scaladsl.GraphDSL.Implicits.PortOps[Out] <and>
[Out](junction: akka.stream.UniformFanOutShape[exasol.LoginCommand.type,Out])(implicit b: akka.stream.scaladsl.GraphDSL.Builder[_])akka.stream.scaladsl.GraphDSL.Implicits.PortOps[Out] <and>
[Out](junction: akka.stream.UniformFanInShape[exasol.LoginCommand.type,Out])(implicit b: akka.stream.scaladsl.GraphDSL.Builder[_])akka.stream.scaladsl.GraphDSL.Implicits.PortOps[Out] <and>
[Out](via: akka.stream.Graph[akka.stream.FlowShape[exasol.LoginCommand.type,Out],Any])(implicit b: akka.stream.scaladsl.GraphDSL.Builder[_])akka.stream.scaladsl.GraphDSL.Implicits.PortOps[Out] <and>
[U >: exasol.LoginCommand.type](to: akka.stream.Inlet[U])(implicit b: akka.stream.scaladsl.GraphDSL.Builder[_])Unit
cannot be applied to (akka.stream.FlowShape[exasol.LoginCommand,akka.http.scaladsl.model.ws.TextMessage.Strict])
Source.single(LoginCommand) ~> loginMessage ~> exasolAnnounce ~> announceResponse ~> fanIn.in0
你需要这样的东西:
object Login {
def graph(system: ActorSystem, future: Future[LoginCommand.UserData], socketUrl: String): Source[Message, NotUsed] =
Source.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val in = Source.fromFuture(future)
in.named("LoginData")
val fanIn = builder.add(Zip[LoginResponse, LoginCommand.UserData])
val exasolLogin = builder.add(Http(system).webSocketClientFlow(WebSocketRequest(socketUrl)))
val encryptLoginData = Flow[(LoginResponse, LoginCommand.UserData)].map(data => TextMessage(data.toString)) //stub
val encryptAnnounceData = Flow[LoginCommand].map(data => TextMessage(data.toString)) //stub
val decryptAnnounceData = Flow[Message].map(message => LoginResponse(message)) //stub
val exasolAnnounce = Http(system).webSocketClientFlow(WebSocketRequest(socketUrl))
val announceLogin = Source.single(LoginCommand)
in ~> fanIn.in1
announceLogin ~> encryptAnnounceData ~> exasolAnnounce ~> decryptAnnounceData ~> fanIn.in0
fanIn.out ~> encryptLoginData ~> exasolLogin
SourceShape(exasolLogin.out)
})
}
请记住,-> 和 ~> 是不同的运算符(您应该使用 ~>)。并且仅当您要手动连接形状入口和出口时,才需要向构建器添加形状。
我正在尝试使用 Akka Flow 实现 Websocket 登录流程。我收到无数关于入口、出口和连接问题的令人讨厌的运行时异常。 我的最新是:
java.lang.IllegalStateException: Illegal GraphDSL usage. Inlets [Map.in] were not returned in the resulting shape and not connected.
片段:
object Login {
def graph(system: ActorSystem, future: Future[LoginCommand.UserData], socketUrl: String) =
Source.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val in = Source.fromFuture(future)
in.named("LoginData")
val fanIn = Zip[LoginResponse, LoginCommand.UserData]
val exasolLogin = builder.add(Http(system).webSocketClientFlow(WebSocketRequest(socketUrl)))
val encryptLoginData = FlowShape(exasolLogin.in, fanIn.out)
val exasolAnnounce = Http(system).webSocketClientFlow(WebSocketRequest(socketUrl))
val announceLogin = Source.single(LoginCommand)
in -> fanIn
announceLogin -> exasolAnnounce -> fanIn
fanIn -> encryptLoginData -> exasolLogin
SourceShape(exasolLogin.out)
})
}
我可能完全错误地使用了 DSL,因为我还没有找到一篇深入解释图形、形状、流、物化值的文章。 有人可以指出我做错了什么或者应该怎么写吗?
编辑 1:
现已将 ->
替换为 ~>
并出现严重的编译错误:
object Login {
def graph(system: ActorSystem, future: Future[LoginCommand.UserData], socketUrl: String) =
Source.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val in = Source.fromFuture(future)
in.named("LoginData")
val fanIn = builder.add(Zip[LoginResponse, LoginCommand.UserData])
val exasolLogin = Http(system).webSocketClientFlow(WebSocketRequest(socketUrl))
val encryptLoginData = Flow[(LoginResponse, LoginCommand.UserData)].map(data => data._1)
val loginDataMessage = Flow[LoginCommand.UserData].map(data => TextMessage("bar"))
val exasolAnnounce = Http(system).webSocketClientFlow(WebSocketRequest(socketUrl))
val announceResponse = Flow[Message].map(data => LoginResponse("key", "mod", "exp"))
val loginMessage = Flow[LoginCommand].map(data => TextMessage("foo"))
val session = builder.add(Flow[Message].map(data => LoginCommand.SessionData(0, 1, "2", "db", "w", 59, 546, 45, "q", "TZ", "TZB")))
in ~> fanIn.in1
Source.single(LoginCommand) ~> loginMessage ~> exasolAnnounce ~> announceResponse ~> fanIn.in0
fanIn.out ~> encryptLoginData ~> loginDataMessage ~> exasolLogin ~> session
SourceShape(session.out)
})
}
这导致
exasol-client/LoginGraph.scala:42: error: overloaded method value ~> with alternatives:
(to: akka.stream.SinkShape[exasol.LoginCommand.type])(implicit b: akka.stream.scaladsl.GraphDSL.Builder[_])Unit <and>
(to: akka.stream.Graph[akka.stream.SinkShape[exasol.LoginCommand.type], _])(implicit b: akka.stream.scaladsl.GraphDSL.Builder[_])Unit <and>
[Out](flow: akka.stream.FlowShape[exasol.LoginCommand.type,Out])(implicit b: akka.stream.scaladsl.GraphDSL.Builder[_])akka.stream.scaladsl.GraphDSL.Implicits.PortOps[Out] <and>
[Out](junction: akka.stream.UniformFanOutShape[exasol.LoginCommand.type,Out])(implicit b: akka.stream.scaladsl.GraphDSL.Builder[_])akka.stream.scaladsl.GraphDSL.Implicits.PortOps[Out] <and>
[Out](junction: akka.stream.UniformFanInShape[exasol.LoginCommand.type,Out])(implicit b: akka.stream.scaladsl.GraphDSL.Builder[_])akka.stream.scaladsl.GraphDSL.Implicits.PortOps[Out] <and>
[Out](via: akka.stream.Graph[akka.stream.FlowShape[exasol.LoginCommand.type,Out],Any])(implicit b: akka.stream.scaladsl.GraphDSL.Builder[_])akka.stream.scaladsl.GraphDSL.Implicits.PortOps[Out] <and>
[U >: exasol.LoginCommand.type](to: akka.stream.Inlet[U])(implicit b: akka.stream.scaladsl.GraphDSL.Builder[_])Unit
cannot be applied to (akka.stream.FlowShape[exasol.LoginCommand,akka.http.scaladsl.model.ws.TextMessage.Strict])
Source.single(LoginCommand) ~> loginMessage ~> exasolAnnounce ~> announceResponse ~> fanIn.in0
你需要这样的东西:
object Login {
def graph(system: ActorSystem, future: Future[LoginCommand.UserData], socketUrl: String): Source[Message, NotUsed] =
Source.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val in = Source.fromFuture(future)
in.named("LoginData")
val fanIn = builder.add(Zip[LoginResponse, LoginCommand.UserData])
val exasolLogin = builder.add(Http(system).webSocketClientFlow(WebSocketRequest(socketUrl)))
val encryptLoginData = Flow[(LoginResponse, LoginCommand.UserData)].map(data => TextMessage(data.toString)) //stub
val encryptAnnounceData = Flow[LoginCommand].map(data => TextMessage(data.toString)) //stub
val decryptAnnounceData = Flow[Message].map(message => LoginResponse(message)) //stub
val exasolAnnounce = Http(system).webSocketClientFlow(WebSocketRequest(socketUrl))
val announceLogin = Source.single(LoginCommand)
in ~> fanIn.in1
announceLogin ~> encryptAnnounceData ~> exasolAnnounce ~> decryptAnnounceData ~> fanIn.in0
fanIn.out ~> encryptLoginData ~> exasolLogin
SourceShape(exasolLogin.out)
})
}
请记住,-> 和 ~> 是不同的运算符(您应该使用 ~>)。并且仅当您要手动连接形状入口和出口时,才需要向构建器添加形状。