复合流(来自 Sink 和 Source)的目的是什么?
What is the purpose of the composite flow(from Sink and Source)?
我正在尝试从 website 中理解复合流(来自 Sink 和 Source),它们表示如下:
有人可以提供复合流的用法示例吗?
我应该什么时候使用它?
也许在某些情况下您只需要提供流程,而在某些情况下您需要一个 NoOp 流程。
那么你可以做
Flow.fromSinkAndSource(Sink.ignore,Source.empty)
或者忽略源中的每个元素并使用另一个元素
Flow.fromSinkAndSource(Sink.ignore,Source.tick(1.second,1.second,"something"))
Flow.fromSinkAndSource 提供了一种方便的方法来 assemble 一个 flow
由一个 sink
作为它的输入和一个 source
作为它的输出组成,而不是已连接,下图可以很好地说明这一点(在 API link 中可用):
+----------------------------------------------+
| Resulting Flow[I, O, NotUsed] |
| |
| +---------+ +-----------+ |
| | | | | |
I ~~>| Sink[I] | [no-connection!] | Source[O] | ~~> O
| | | | | |
| +---------+ +-----------+ |
+----------------------------------------------+
如@gabrielgiussi 的回答所示,它通常用于想要将现有 source
(或 flow
)的输出 "switch" 输出到某些不同输出的情况 - 对于测试目的或其他目的。这是一个简单的例子:
import akka.actor.ActorSystem
import akka.stream.scaladsl._
implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()
val switchFlow = Flow.fromSinkAndSource( Sink.ignore, Source(List("a", "b", "c")) )
Source(1 to 5).via(switchFlow).runForeach(println)
// res1: scala.concurrent.Future[akka.Done] = Future(Success(Done))
// a
// b
// c
还值得注意的是,该方法的 "Mat" 版本,fromSinkAndSourceMat, has some interesting use cases. An example is to use it to keep half-closed
WebSockets open by using Source.maybe[T]
to maintain a Promise[Option[T]]
as the materialized value which will be completed when one wants to close the connection. Below is the sample code from the relevant section in the Akka-http WebSockets client support 文档:
// using Source.maybe materializes into a promise
// which will allow us to complete the source later
val flow: Flow[Message, Message, Promise[Option[Message]]] =
Flow.fromSinkAndSourceMat(
Sink.foreach[Message](println),
Source.maybe[Message])(Keep.right)
val (upgradeResponse, promise) =
Http().singleWebSocketRequest(
WebSocketRequest("ws://example.com:8080/some/path"),
flow)
// at some later time we want to disconnect
promise.success(None)
我从here
那里得到了理解
object SingleWebSocketRequest {
def main(args: Array[String]) = {
// print each incoming strict text message
val printSink: Sink[Message, Future[Done]] =
Sink.foreach {
case message: TextMessage.Strict =>
println(message.text)
}
val helloSource: Source[Message, NotUsed] =
Source.single(TextMessage("hello world!"))
// the Future[Done] is the materialized value of Sink.foreach
// and it is completed when the stream completes
val flow: Flow[Message, Message, Future[Done]] =
Flow.fromSinkAndSourceMat(printSink, helloSource)(Keep.left)
// upgradeResponse is a Future[WebSocketUpgradeResponse] that
// completes or fails when the connection succeeds or fails
// and closed is a Future[Done] representing the stream completion from above
val (upgradeResponse, closed) =
Http().singleWebSocketRequest(WebSocketRequest("ws://echo.websocket.org"), flow)
val connected = upgradeResponse.map { upgrade =>
// just like a regular http request we can access response status which is available via upgrade.response.status
// status code 101 (Switching Protocols) indicates that server support WebSockets
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Done
} else {
throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
}
}
// in a real application you would not side effect here
// and handle errors more carefully
connected.onComplete(println)
closed.foreach(_ => println("closed"))
}
}
我实际用过这个,很方便。 Websocket 是一种双向连接,Akka-HTTP WebSocket 提供了 SingleWebSocketRequest 函数,该函数接受一个 flow in 参数,并在 joinMat 函数中使用它作为参数。使用此配置,您的源在这里起着关键作用,向 WebSocket 发送消息,而您的接收器用于从 WebSocket 接收消息。所以这不仅仅是:
来源 ~> 接收器
就像
其他来源 (WebSocket) ~> 接收器
其他来源 (WebSocket) <~ 来源(例如:每 15 秒发送一次 ping 消息)
我正在尝试从 website 中理解复合流(来自 Sink 和 Source),它们表示如下:
有人可以提供复合流的用法示例吗?
我应该什么时候使用它?
也许在某些情况下您只需要提供流程,而在某些情况下您需要一个 NoOp 流程。 那么你可以做
Flow.fromSinkAndSource(Sink.ignore,Source.empty)
或者忽略源中的每个元素并使用另一个元素
Flow.fromSinkAndSource(Sink.ignore,Source.tick(1.second,1.second,"something"))
Flow.fromSinkAndSource 提供了一种方便的方法来 assemble 一个 flow
由一个 sink
作为它的输入和一个 source
作为它的输出组成,而不是已连接,下图可以很好地说明这一点(在 API link 中可用):
+----------------------------------------------+
| Resulting Flow[I, O, NotUsed] |
| |
| +---------+ +-----------+ |
| | | | | |
I ~~>| Sink[I] | [no-connection!] | Source[O] | ~~> O
| | | | | |
| +---------+ +-----------+ |
+----------------------------------------------+
如@gabrielgiussi 的回答所示,它通常用于想要将现有 source
(或 flow
)的输出 "switch" 输出到某些不同输出的情况 - 对于测试目的或其他目的。这是一个简单的例子:
import akka.actor.ActorSystem
import akka.stream.scaladsl._
implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()
val switchFlow = Flow.fromSinkAndSource( Sink.ignore, Source(List("a", "b", "c")) )
Source(1 to 5).via(switchFlow).runForeach(println)
// res1: scala.concurrent.Future[akka.Done] = Future(Success(Done))
// a
// b
// c
还值得注意的是,该方法的 "Mat" 版本,fromSinkAndSourceMat, has some interesting use cases. An example is to use it to keep half-closed
WebSockets open by using Source.maybe[T]
to maintain a Promise[Option[T]]
as the materialized value which will be completed when one wants to close the connection. Below is the sample code from the relevant section in the Akka-http WebSockets client support 文档:
// using Source.maybe materializes into a promise
// which will allow us to complete the source later
val flow: Flow[Message, Message, Promise[Option[Message]]] =
Flow.fromSinkAndSourceMat(
Sink.foreach[Message](println),
Source.maybe[Message])(Keep.right)
val (upgradeResponse, promise) =
Http().singleWebSocketRequest(
WebSocketRequest("ws://example.com:8080/some/path"),
flow)
// at some later time we want to disconnect
promise.success(None)
我从here
那里得到了理解object SingleWebSocketRequest {
def main(args: Array[String]) = {
// print each incoming strict text message
val printSink: Sink[Message, Future[Done]] =
Sink.foreach {
case message: TextMessage.Strict =>
println(message.text)
}
val helloSource: Source[Message, NotUsed] =
Source.single(TextMessage("hello world!"))
// the Future[Done] is the materialized value of Sink.foreach
// and it is completed when the stream completes
val flow: Flow[Message, Message, Future[Done]] =
Flow.fromSinkAndSourceMat(printSink, helloSource)(Keep.left)
// upgradeResponse is a Future[WebSocketUpgradeResponse] that
// completes or fails when the connection succeeds or fails
// and closed is a Future[Done] representing the stream completion from above
val (upgradeResponse, closed) =
Http().singleWebSocketRequest(WebSocketRequest("ws://echo.websocket.org"), flow)
val connected = upgradeResponse.map { upgrade =>
// just like a regular http request we can access response status which is available via upgrade.response.status
// status code 101 (Switching Protocols) indicates that server support WebSockets
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Done
} else {
throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
}
}
// in a real application you would not side effect here
// and handle errors more carefully
connected.onComplete(println)
closed.foreach(_ => println("closed"))
}
}
我实际用过这个,很方便。 Websocket 是一种双向连接,Akka-HTTP WebSocket 提供了 SingleWebSocketRequest 函数,该函数接受一个 flow in 参数,并在 joinMat 函数中使用它作为参数。使用此配置,您的源在这里起着关键作用,向 WebSocket 发送消息,而您的接收器用于从 WebSocket 接收消息。所以这不仅仅是:
来源 ~> 接收器
就像
其他来源 (WebSocket) ~> 接收器
其他来源 (WebSocket) <~ 来源(例如:每 15 秒发送一次 ping 消息)