Akka Stream - Select 基于流中元素的接收器
Akka Stream - Select Sink based on Element in Flow
我正在使用 Akka 流创建一个简单的消息传递服务。该服务就像邮件投递一样,源中的元素包括 destination
和 content
,例如:
case class Message(destination: String, content: String)
并且服务应根据 destination
字段将消息传送到适当的接收器。我创建了一个 DeliverySink
class 让它有一个名字:
case class DeliverySink(name: String, sink: Sink[String, Future[Done]])
现在,我实例化了两个 DeliverySink
,让我称它们为 sinkX
和 sinkY
,并根据它们的名称创建了一个地图。实际上,我想提供一个接收器名称列表,并且该列表应该是可配置的。
我面临的挑战是如何根据destination
字段动态选择合适的接收器。
最后,我想将 Flow[Message]
映射到接收器。我试过了:
val sinkNames: List[String] = List("sinkX", "sinkY")
val sinkMapping: Map[String, DeliverySink] =
sinkNames.map { name => name -> DeliverySink(name, ???)}.toMap
Flow[Message].map { msg => msg.content }.to(sinks(msg.destination).sink)
但是,显然这行不通,因为我们无法在地图之外引用 msg
...
我想这不是一个正确的方法。我还考虑过将 filter
与 broadcast
一起使用,但如果目标缩放到 100,我就无法键入每个路由。实现我的目标的正确方法是什么?
[编辑]
理想情况下,我想让目的地动态化。因此,我无法在过滤器或路由逻辑中静态键入所有目的地。如果目标接收器尚未连接,它也应该动态创建一个新接收器。
如果您必须使用多个水槽
Sink.combine
将直接满足您现有的要求。如果您在每个 Sink
之前附上适当的 Flow.filter
,那么他们只会收到适当的消息。
不要使用多个接收器
总的来说,我认为让流的结构和内容包含业务逻辑是糟糕的设计。您的流应该是在普通 scala/java 代码中的业务逻辑之上用于背压并发的薄单板。
在这种特殊情况下,我认为最好将目标路由包装在单个 Sink 中,并且逻辑应该在单独的函数中实现。例如:
val routeMessage : (Message) => Unit =
(message) =>
if(message.destination equalsIgnoreCase "stdout")
System.out println message.content
else if(message.destination equalsIgnoreCase "stderr")
System.err println message.content
val routeSink : Sink[Message, _] = Sink foreach routeMessage
请注意现在测试我的 routeMessage
是多么容易,因为它不在流内:我不需要任何 akka testkit "stuff" 来测试 routeMessage。如果我的并发设计要更改,我也可以将函数移动到 Future
或 Thread
。
许多目的地
如果您有多个目的地,您可以使用 Map
。例如,假设您要将消息发送到 AmazonSQS。您可以定义一个函数来将 Queue Name 转换为 Queue URL 并使用该函数来维护已创建名称的 Map:
type QueueName = String
val nameToRequest : (QueueName) => CreateQueueRequest = ??? //implementation unimportant
type QueueURL = String
val nameToURL : (AmazonSQS) => (QueueName) => QueueURL = {
val nameToURL = mutable.Map.empty[QueueName, QueueURL]
(sqs) => (queueName) => nameToURL.get(queueName) match {
case Some(url) => url
case None => {
sqs.createQueue(nameToRequest(queueName))
val url = sqs.getQueueUrl(queueName).getQueueUrl()
nameToURL put (queueName, url)
url
}
}
}
现在您可以在单个 Sink 中使用此非流函数:
val sendMessage : (AmazonSQS) => (Message) => Unit =
(sqs) => (message) =>
sqs sendMessage {
(new SendMessageRequest())
.withQueueUrl(nameToURL(sqs)(message.destination))
.withMessageBody(message.content)
}
val sqs : AmazonSQS = ???
val messageSink = Sink foreach sendMessage(sqs)
旁注
对于 destination
你可能想使用 String
以外的东西。 coproduct 通常更好,因为它们可以与 case 语句一起使用,如果您错过其中一种可能性,您将获得有用的编译器错误:
sealed trait Destination
object Out extends Destination
object Err extends Destination
object SomethingElse extends Destination
case class Message(destination: Destination, content: String)
//This function won't compile because SomethingElse doesn't have a case
val routeMessage : (Message) => Unit =
(message) => message.destination match {
case Out =>
System.out.println(message.content)
case Err =>
System.err.println(message.content)
}
鉴于您的要求,也许您想考虑使用 groubBy:
将您的流源多路复用到子流中
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.util.ByteString
import akka.{NotUsed, Done}
import akka.stream.IOResult
import scala.concurrent.Future
import java.nio.file.Paths
import java.nio.file.StandardOpenOption._
implicit val system = ActorSystem("sys")
implicit val materializer = ActorMaterializer()
import system.dispatcher
case class Message(destination: String, content: String)
case class DeliverySink(name: String, sink: Sink[ByteString, Future[IOResult]])
val messageSource: Source[Message, NotUsed] = Source(List(
Message("a", "uuu"), Message("a", "vvv"),
Message("b", "xxx"), Message("b", "yyy"), Message("b", "zzz")
))
val sinkA = DeliverySink("sink-a", FileIO.toPath(
Paths.get("/path/to/sink-a.txt"), options = Set(CREATE, WRITE)
))
val sinkB = DeliverySink("sink-b", FileIO.toPath(
Paths.get("/path/to/sink-b.txt"), options = Set(CREATE, WRITE)
))
val sinkMapping: Map[String, DeliverySink] = Map("a" -> sinkA, "b" -> sinkB)
val totalDests = 2
messageSource.map(m => (m.destination, m)).
groupBy(totalDests, _._1).
fold(("", List.empty[Message])) {
case ((_, list), (dest, msg)) => (dest, msg :: list)
}.
mapAsync(parallelism = totalDests) {
case (dest: String, msgList: List[Message]) =>
Source(msgList.reverse).map(_.content).map(ByteString(_)).
runWith(sinkMapping(dest).sink)
}.
mergeSubstreams.
runWith(Sink.ignore)
我正在使用 Akka 流创建一个简单的消息传递服务。该服务就像邮件投递一样,源中的元素包括 destination
和 content
,例如:
case class Message(destination: String, content: String)
并且服务应根据 destination
字段将消息传送到适当的接收器。我创建了一个 DeliverySink
class 让它有一个名字:
case class DeliverySink(name: String, sink: Sink[String, Future[Done]])
现在,我实例化了两个 DeliverySink
,让我称它们为 sinkX
和 sinkY
,并根据它们的名称创建了一个地图。实际上,我想提供一个接收器名称列表,并且该列表应该是可配置的。
我面临的挑战是如何根据destination
字段动态选择合适的接收器。
最后,我想将 Flow[Message]
映射到接收器。我试过了:
val sinkNames: List[String] = List("sinkX", "sinkY")
val sinkMapping: Map[String, DeliverySink] =
sinkNames.map { name => name -> DeliverySink(name, ???)}.toMap
Flow[Message].map { msg => msg.content }.to(sinks(msg.destination).sink)
但是,显然这行不通,因为我们无法在地图之外引用 msg
...
我想这不是一个正确的方法。我还考虑过将 filter
与 broadcast
一起使用,但如果目标缩放到 100,我就无法键入每个路由。实现我的目标的正确方法是什么?
[编辑]
理想情况下,我想让目的地动态化。因此,我无法在过滤器或路由逻辑中静态键入所有目的地。如果目标接收器尚未连接,它也应该动态创建一个新接收器。
如果您必须使用多个水槽
Sink.combine
将直接满足您现有的要求。如果您在每个 Sink
之前附上适当的 Flow.filter
,那么他们只会收到适当的消息。
不要使用多个接收器
总的来说,我认为让流的结构和内容包含业务逻辑是糟糕的设计。您的流应该是在普通 scala/java 代码中的业务逻辑之上用于背压并发的薄单板。
在这种特殊情况下,我认为最好将目标路由包装在单个 Sink 中,并且逻辑应该在单独的函数中实现。例如:
val routeMessage : (Message) => Unit =
(message) =>
if(message.destination equalsIgnoreCase "stdout")
System.out println message.content
else if(message.destination equalsIgnoreCase "stderr")
System.err println message.content
val routeSink : Sink[Message, _] = Sink foreach routeMessage
请注意现在测试我的 routeMessage
是多么容易,因为它不在流内:我不需要任何 akka testkit "stuff" 来测试 routeMessage。如果我的并发设计要更改,我也可以将函数移动到 Future
或 Thread
。
许多目的地
如果您有多个目的地,您可以使用 Map
。例如,假设您要将消息发送到 AmazonSQS。您可以定义一个函数来将 Queue Name 转换为 Queue URL 并使用该函数来维护已创建名称的 Map:
type QueueName = String
val nameToRequest : (QueueName) => CreateQueueRequest = ??? //implementation unimportant
type QueueURL = String
val nameToURL : (AmazonSQS) => (QueueName) => QueueURL = {
val nameToURL = mutable.Map.empty[QueueName, QueueURL]
(sqs) => (queueName) => nameToURL.get(queueName) match {
case Some(url) => url
case None => {
sqs.createQueue(nameToRequest(queueName))
val url = sqs.getQueueUrl(queueName).getQueueUrl()
nameToURL put (queueName, url)
url
}
}
}
现在您可以在单个 Sink 中使用此非流函数:
val sendMessage : (AmazonSQS) => (Message) => Unit =
(sqs) => (message) =>
sqs sendMessage {
(new SendMessageRequest())
.withQueueUrl(nameToURL(sqs)(message.destination))
.withMessageBody(message.content)
}
val sqs : AmazonSQS = ???
val messageSink = Sink foreach sendMessage(sqs)
旁注
对于 destination
你可能想使用 String
以外的东西。 coproduct 通常更好,因为它们可以与 case 语句一起使用,如果您错过其中一种可能性,您将获得有用的编译器错误:
sealed trait Destination
object Out extends Destination
object Err extends Destination
object SomethingElse extends Destination
case class Message(destination: Destination, content: String)
//This function won't compile because SomethingElse doesn't have a case
val routeMessage : (Message) => Unit =
(message) => message.destination match {
case Out =>
System.out.println(message.content)
case Err =>
System.err.println(message.content)
}
鉴于您的要求,也许您想考虑使用 groubBy:
将您的流源多路复用到子流中import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.util.ByteString
import akka.{NotUsed, Done}
import akka.stream.IOResult
import scala.concurrent.Future
import java.nio.file.Paths
import java.nio.file.StandardOpenOption._
implicit val system = ActorSystem("sys")
implicit val materializer = ActorMaterializer()
import system.dispatcher
case class Message(destination: String, content: String)
case class DeliverySink(name: String, sink: Sink[ByteString, Future[IOResult]])
val messageSource: Source[Message, NotUsed] = Source(List(
Message("a", "uuu"), Message("a", "vvv"),
Message("b", "xxx"), Message("b", "yyy"), Message("b", "zzz")
))
val sinkA = DeliverySink("sink-a", FileIO.toPath(
Paths.get("/path/to/sink-a.txt"), options = Set(CREATE, WRITE)
))
val sinkB = DeliverySink("sink-b", FileIO.toPath(
Paths.get("/path/to/sink-b.txt"), options = Set(CREATE, WRITE)
))
val sinkMapping: Map[String, DeliverySink] = Map("a" -> sinkA, "b" -> sinkB)
val totalDests = 2
messageSource.map(m => (m.destination, m)).
groupBy(totalDests, _._1).
fold(("", List.empty[Message])) {
case ((_, list), (dest, msg)) => (dest, msg :: list)
}.
mapAsync(parallelism = totalDests) {
case (dest: String, msgList: List[Message]) =>
Source(msgList.reverse).map(_.content).map(ByteString(_)).
runWith(sinkMapping(dest).sink)
}.
mergeSubstreams.
runWith(Sink.ignore)