在 Akka 中实现基于内容的路由器模式
Implementing Content-Based Router Pattern in Akka
我正在尝试实施 content-based router in my Akka actor system and according to this document the ConsistentHashingRouter 是可行的方法。在阅读了它的官方文档之后,我仍然对如何使用这个内置的哈希路由器感到困惑。我认为那是因为路由器本身是 hash/key-based,而 Akka 文档作者选择使用的示例是涉及基于键值的缓存的场景......所以我无法分辨缓存使用哪些键以及哪些键被路由器使用!
举个简单的例子。假设我们有以下消息:
interface Notification {
// Doesn’t matter what’s here.
}
// Will eventually be emailed to someone.
class EmailNotification implements Notification {
// Doesn’t matter what’s here.
}
// Will eventually be sent to some XMPP client and on to a chatroom somewhere.
class ChatOpsNotifications implements Notification {
// Doesn’t matter what’s here.
}
等理论上我们可能有 20 Notification
个 impl。我希望能够在运行时将 Notification
发送到 actor/router 并让路由器将其路由到正确的 NotificationPubisher
:
interface NotificationPublisher<NOTIFICATION implements Notification> {
void send(NOTIFICATION notification)
}
class EmailNotificationPublisher extends UntypedActor implements NotificationPubisher<EmailNotification> {
@Override
void onReceive(Object message) {
if(message instanceof EmailNotification) {
send(message as EmailNotification)
}
}
@Override
void send(EmailNotification notification) {
// Use Java Mail, etc.
}
}
class ChatOpsNotificationPublisher extends UntypedActor implements NotificationPubisher<ChatOpsNotification> {
@Override
void onReceive(Object message) {
if(message instanceof ChatOpsNotification) {
send(message as ChatOpsNotification)
}
}
@Override
void send(ChatOpsNotification notification) {
// Use XMPP/Jabber client, etc.
}
}
现在我可以自己手动进行路由:
class ReinventingTheWheelRouter extends UntypedActor {
// Inject these via constructor
ActorRef emailNotificationPublisher
ActorRef chatOpsNotificationPublisher
// ...20 more publishers, etc.
@Override
void onReceive(Object message) {
ActorRef publisher
if(message instanceof EmailNotification) {
publisher = emailNotificationPublisher
} else if(message instanceof ChatOpsNotification) {
publisher = chatOpsNotificationPublisher
} else if(...) { ... } // 20 more publishers, etc.
publisher.tell(message, self)
}
}
或者我可以使用Akka-Camel module定义一个基于Camel的路由器并发送Notifications
到Camel路由器,但似乎Akka aready 有这个内置的解决方案,为什么不使用它呢?我只是不知道如何将那些 Akka 文档中的 Cache
示例翻译成我的 Notification
示例。 ConsistentHashingRouter
中的“key”有什么用?使这项工作有效的代码是什么样的?
当然,如果有任何答案可以帮助我解决这个问题,我将不胜感激,但如果可能的话,我更喜欢基于 Java 的代码片段。 Scala 对我来说就像象形文字。
我同意 Custom Router 比 ConsistentHashingRouter
更合适。在阅读了关于自定义路由器的文档后,我似乎会:
- 创建一个
GroupBase
impl 并直接向其发送消息(notificationGroup.tell(notification, self)
);然后
GroupBase
实现,比如说,NotificationGroup
将提供一个 Router
实例,该实例注入了我的自定义 RoutingLogic
实现
- 当
NotificationGroup
收到一条消息时,它会执行我的自定义 RoutingLogic#select
方法,该方法确定将消息发送给哪个 Routee
(我假设是某种演员?)
如果这是正确的(如果我错了请纠正我),那么路由选择魔术就发生在这里:
class MessageBasedRoutingLogic implements RoutingLogic {
@Override
Routee select(Object message, IndexedSeq<Routee> candidates) {
// How can I query the Routee interface and deterine whether the message at-hand is in fact
// appropriate to be routed to the candidate?
//
// For instance I'd like to say "If message is an instance of
// an EmailNotification, send it to EmailNotificationPublisher."
//
// How do I do this here?!?
if(message instanceof EmailNotification) {
// Need to find the candidate/Routee that is
// the EmailNotificationPublisher, but how?!?
}
}
}
但如您所见,我有一些心理实施障碍需要跨越。 Routee
界面并没有真正给我任何可以智能地用来决定特定 Routee
(候选人)是否适合手头消息的东西。
所以我问:(1)如何将消息映射到Routees
(有效地执行路由selection/logic)? (2) 首先如何将我的发布者添加为路由?并且 (3) 我的 NotificationPublisher
impls 是否仍然需要扩展 UntypedActor
或者他们现在应该实施 Routee
?
这是 Scala 中的一个简单的小型 A/B 路由器。我希望这对您有所帮助,即使您想要一个基于 Java 的答案。首先是路由逻辑:
class ABRoutingLogic(a:ActorRef, b:ActorRef) extends RoutingLogic{
val aRoutee = ActorRefRoutee(a)
val bRoutee = ActorRefRoutee(b)
def select(msg:Any, routees:immutable.IndexedSeq[Routee]):Routee = {
msg match{
case "A" => aRoutee
case _ => bRoutee
}
}
}
这里的关键是我在构造函数中传入我的 a
和 b
actor 引用,然后这些是我在 select
方法中路由到的引用。然后,这个逻辑的Group
:
case class ABRoutingGroup(a:ActorRef, b:ActorRef) extends Group {
val paths = List(a.path.toString, b.path.toString)
override def createRouter(system: ActorSystem): Router =
new Router(new ABRoutingLogic(a, b))
val routerDispatcher: String = Dispatchers.DefaultDispatcherId
}
同样的事情,我正在通过构造函数使我想要路由到的演员可用。现在一个简单的演员 class 充当 a
和 b
:
class PrintingActor(letter:String) extends Actor{
def receive = {
case msg => println(s"I am $letter and I received letter $msg")
}
}
我将为此创建两个实例,每个实例分配不同的字母,这样我们就可以验证正确的实例是否根据路由逻辑获得了正确的消息。最后,一些测试代码:
object RoutingTest extends App{
val system = ActorSystem()
val a = system.actorOf(Props(classOf[PrintingActor], "A"))
val b = system.actorOf(Props(classOf[PrintingActor], "B"))
val router = system.actorOf(Props.empty.withRouter(ABRoutingGroup(a,b)))
router ! "A"
router ! "B"
}
如果你运行这个,你会看到:
I am A and I received letter A
I am B and I received letter B
这是一个非常简单的例子,但它展示了一种方法来做你想做的事。我希望你能将这段代码桥接到 Java 并用它来解决你的问题。
我正在尝试实施 content-based router in my Akka actor system and according to this document the ConsistentHashingRouter 是可行的方法。在阅读了它的官方文档之后,我仍然对如何使用这个内置的哈希路由器感到困惑。我认为那是因为路由器本身是 hash/key-based,而 Akka 文档作者选择使用的示例是涉及基于键值的缓存的场景......所以我无法分辨缓存使用哪些键以及哪些键被路由器使用!
举个简单的例子。假设我们有以下消息:
interface Notification {
// Doesn’t matter what’s here.
}
// Will eventually be emailed to someone.
class EmailNotification implements Notification {
// Doesn’t matter what’s here.
}
// Will eventually be sent to some XMPP client and on to a chatroom somewhere.
class ChatOpsNotifications implements Notification {
// Doesn’t matter what’s here.
}
等理论上我们可能有 20 Notification
个 impl。我希望能够在运行时将 Notification
发送到 actor/router 并让路由器将其路由到正确的 NotificationPubisher
:
interface NotificationPublisher<NOTIFICATION implements Notification> {
void send(NOTIFICATION notification)
}
class EmailNotificationPublisher extends UntypedActor implements NotificationPubisher<EmailNotification> {
@Override
void onReceive(Object message) {
if(message instanceof EmailNotification) {
send(message as EmailNotification)
}
}
@Override
void send(EmailNotification notification) {
// Use Java Mail, etc.
}
}
class ChatOpsNotificationPublisher extends UntypedActor implements NotificationPubisher<ChatOpsNotification> {
@Override
void onReceive(Object message) {
if(message instanceof ChatOpsNotification) {
send(message as ChatOpsNotification)
}
}
@Override
void send(ChatOpsNotification notification) {
// Use XMPP/Jabber client, etc.
}
}
现在我可以自己手动进行路由:
class ReinventingTheWheelRouter extends UntypedActor {
// Inject these via constructor
ActorRef emailNotificationPublisher
ActorRef chatOpsNotificationPublisher
// ...20 more publishers, etc.
@Override
void onReceive(Object message) {
ActorRef publisher
if(message instanceof EmailNotification) {
publisher = emailNotificationPublisher
} else if(message instanceof ChatOpsNotification) {
publisher = chatOpsNotificationPublisher
} else if(...) { ... } // 20 more publishers, etc.
publisher.tell(message, self)
}
}
或者我可以使用Akka-Camel module定义一个基于Camel的路由器并发送Notifications
到Camel路由器,但似乎Akka aready 有这个内置的解决方案,为什么不使用它呢?我只是不知道如何将那些 Akka 文档中的 Cache
示例翻译成我的 Notification
示例。 ConsistentHashingRouter
中的“key”有什么用?使这项工作有效的代码是什么样的?
当然,如果有任何答案可以帮助我解决这个问题,我将不胜感激,但如果可能的话,我更喜欢基于 Java 的代码片段。 Scala 对我来说就像象形文字。
我同意 Custom Router 比 ConsistentHashingRouter
更合适。在阅读了关于自定义路由器的文档后,我似乎会:
- 创建一个
GroupBase
impl 并直接向其发送消息(notificationGroup.tell(notification, self)
);然后 GroupBase
实现,比如说,NotificationGroup
将提供一个Router
实例,该实例注入了我的自定义RoutingLogic
实现- 当
NotificationGroup
收到一条消息时,它会执行我的自定义RoutingLogic#select
方法,该方法确定将消息发送给哪个Routee
(我假设是某种演员?)
如果这是正确的(如果我错了请纠正我),那么路由选择魔术就发生在这里:
class MessageBasedRoutingLogic implements RoutingLogic {
@Override
Routee select(Object message, IndexedSeq<Routee> candidates) {
// How can I query the Routee interface and deterine whether the message at-hand is in fact
// appropriate to be routed to the candidate?
//
// For instance I'd like to say "If message is an instance of
// an EmailNotification, send it to EmailNotificationPublisher."
//
// How do I do this here?!?
if(message instanceof EmailNotification) {
// Need to find the candidate/Routee that is
// the EmailNotificationPublisher, but how?!?
}
}
}
但如您所见,我有一些心理实施障碍需要跨越。 Routee
界面并没有真正给我任何可以智能地用来决定特定 Routee
(候选人)是否适合手头消息的东西。
所以我问:(1)如何将消息映射到Routees
(有效地执行路由selection/logic)? (2) 首先如何将我的发布者添加为路由?并且 (3) 我的 NotificationPublisher
impls 是否仍然需要扩展 UntypedActor
或者他们现在应该实施 Routee
?
这是 Scala 中的一个简单的小型 A/B 路由器。我希望这对您有所帮助,即使您想要一个基于 Java 的答案。首先是路由逻辑:
class ABRoutingLogic(a:ActorRef, b:ActorRef) extends RoutingLogic{
val aRoutee = ActorRefRoutee(a)
val bRoutee = ActorRefRoutee(b)
def select(msg:Any, routees:immutable.IndexedSeq[Routee]):Routee = {
msg match{
case "A" => aRoutee
case _ => bRoutee
}
}
}
这里的关键是我在构造函数中传入我的 a
和 b
actor 引用,然后这些是我在 select
方法中路由到的引用。然后,这个逻辑的Group
:
case class ABRoutingGroup(a:ActorRef, b:ActorRef) extends Group {
val paths = List(a.path.toString, b.path.toString)
override def createRouter(system: ActorSystem): Router =
new Router(new ABRoutingLogic(a, b))
val routerDispatcher: String = Dispatchers.DefaultDispatcherId
}
同样的事情,我正在通过构造函数使我想要路由到的演员可用。现在一个简单的演员 class 充当 a
和 b
:
class PrintingActor(letter:String) extends Actor{
def receive = {
case msg => println(s"I am $letter and I received letter $msg")
}
}
我将为此创建两个实例,每个实例分配不同的字母,这样我们就可以验证正确的实例是否根据路由逻辑获得了正确的消息。最后,一些测试代码:
object RoutingTest extends App{
val system = ActorSystem()
val a = system.actorOf(Props(classOf[PrintingActor], "A"))
val b = system.actorOf(Props(classOf[PrintingActor], "B"))
val router = system.actorOf(Props.empty.withRouter(ABRoutingGroup(a,b)))
router ! "A"
router ! "B"
}
如果你运行这个,你会看到:
I am A and I received letter A
I am B and I received letter B
这是一个非常简单的例子,但它展示了一种方法来做你想做的事。我希望你能将这段代码桥接到 Java 并用它来解决你的问题。