第二条消息在我的 Akka.Net 演员中变为未处理,然后它似乎停止了
Second message becomes Unhandled in my Akka.Net actor and then it seems to halt
免责声明:我是 Akka 的新手 :)
我正在尝试在 Akka 中实现一个路由器,基本上
- 收到消息
- 在字典中查找处理消息类型的 IActorRef
- 如果找不到匹配项,使用 Akka.DI 作为子演员创建一个并添加到字典
- 将消息转发给该演员
这很好用 - 第一次,但如果我尝试 Tell() 或 Ask() 路由器两次,第二条消息总是以未处理的形式在流中结束
我已经尝试覆盖子 actor 中的 Unhandled() 并在那里放置一个断点,实际上是在第二条消息中命中。
路由器:
public class CommandRouter : UntypedActor
{
protected readonly IActorResolver _resolver;
private static readonly Dictionary<Type, IActorRef> _routees = new Dictionary<Type, IActorRef>();
private ILoggingAdapter _log = Context.GetLogger(new SerilogLogMessageFormatter());
public CommandRouter(IActorResolver resolver)
{
_resolver = resolver;
}
protected override void OnReceive(object message)
{
_log.Info("Routing command {cmd}", message);
var typeKey = message.GetType();
if (!_routees.ContainsKey(typeKey))
{
var props = CreateActorProps(typeKey);
if (!props.Any())
{
Sender?.Tell(Response.WithException(
new RoutingException(
$"Could not route message to routee. No routees found for message type {typeKey.FullName}")));
return;
}
if (props.Count() > 1)
{
Sender?.Tell(Response.WithException(
new RoutingException(
$"Multiple routees registered for message {typeKey.FullName}, which is not supported by this router. Did you want to publish stuff instead?")));
return;
}
var prop = props.First();
var routee = Context.ActorOf(prop, prop.Type.Name);
_routees.Add(typeKey, routee);
}
_routees[typeKey].Forward(message);
}
private IEnumerable<Props> CreateActorProps(Type messageType)
{
return _resolver.TryCreateActorProps(typeof(IHandleCommand<>).MakeGenericType(messageType)).ToList();
}
protected override SupervisorStrategy SupervisorStrategy()
{
return new OneForOneStrategy(x => Directive.Restart);
}
}
ActorResolver 方法,它使用来自 Akka.DI.StructureMap:
的 DependencyResolver
public IEnumerable<Props> TryCreateActorProps(Type actorType)
{
foreach (var type in _container.GetAllInstances(actorType))
{
yield return _resolver.Create(type.GetType());
}
}
真正的童星很直白:
public class ProductSubscriptionHandler : ReceiveActor, IHandleCommand<AddProductSubscription>
{
public ProductSubscriptionHandler()
{
Receive<AddProductSubscription>(Handle);
}
protected bool Handle(AddProductSubscription command)
{
Sender?.Tell(Response.Empty);
return true;
}
}
整个过程在 actor 系统初始化后调用,如下所示:
var router = Sys.ActorOf(resolver.Create<CommandRouter>(), ActorNames.CommandRouter);
router.Ask(new AddProductSubscription());
router.Ask(new AddProductSubscription());
我一直在第二条(或任何后续)消息中收到此错误:"Unhandled message from...":
[INFO][17-07-2017 23:05:39][Thread 0003][[akka://pos-system/user/CommandRouter#676182398]] Routing command Commands.AddProductSubscription
[DEBUG][17-07-2017 23:05:39][Thread 0003][akka://pos-system/user/CommandRouter] now supervising akka://pos-system/user/CommandRouter/ProductSubscriptionHandler
[DEBUG][17-07-2017 23:05:39][Thread 0003][akka://pos-system/user/CommandRouter] *Unhandled message from akka://pos-system/temp/d* : Documents.Commands.AddProductSubscription
[DEBUG][17-07-2017 23:05:39][Thread 0007][akka://pos-system/user/CommandRouter/ProductSubscriptionHandler] Started (Consumers.Service.Commands.ProductSubscriptionHandler)
所以,事实证明我的问题有一个更简单(并且有效)的解决方案:只需在 CommandRouter 构造函数中注册并启动所有路由 actor,而不是每次接收。
现在我的代码看起来也简单多了:
CommandRouter:
public class CommandRouterActor : UntypedActor
{
public Dictionary<Type, IActorRef> RoutingTable { get; }
private ILoggingAdapter _log = Context.GetLogger(new SerilogLogMessageFormatter());
public CommandRouterActor(IActorResolver resolver)
{
var props = resolver.CreateCommandHandlerProps();
RoutingTable = props.ToDictionary(k => k.Item1, v => Context.ActorOf(v.Item2, $"CommandHandler-{v.Item1.Name}"));
}
protected override void OnReceive(object message)
{
_log.Info("Routing command {cmd}", message);
var typeKey = message.GetType();
if (!RoutingTable.ContainsKey(typeKey))
{
Sender?.Tell(Response.WithException(
new RoutingException(
$"Could not route message to routee. No routees found for message type {typeKey.FullName}")));
_log.Info("Could not route command {cmd}, no routes found", message);
}
RoutingTable[typeKey].Forward(message);
}
protected override SupervisorStrategy SupervisorStrategy()
{
return new OneForOneStrategy(x => Directive.Restart);
}
}
而我的 ActorResolver(在上面的 ctor 中使用)只是 IHandleCommand<>
的 :
public IEnumerable<Tuple<Type, Props>> CreateCommandHandlerProps()
{
var handlerTypes =
_container.Model.AllInstances.Where(
i =>
i.PluginType.IsGenericType && i.PluginType.GetGenericTypeDefinition() ==
typeof(IHandleCommand<>))
.Select(m => m.PluginType);
foreach (var handler in handlerTypes)
{
yield return new Tuple<Type, Props>(handler.GenericTypeArguments.First(), _resolver.Create(handler));
}
}
免责声明:我是 Akka 的新手 :)
我正在尝试在 Akka 中实现一个路由器,基本上
- 收到消息
- 在字典中查找处理消息类型的 IActorRef
- 如果找不到匹配项,使用 Akka.DI 作为子演员创建一个并添加到字典
- 将消息转发给该演员
这很好用 - 第一次,但如果我尝试 Tell() 或 Ask() 路由器两次,第二条消息总是以未处理的形式在流中结束
我已经尝试覆盖子 actor 中的 Unhandled() 并在那里放置一个断点,实际上是在第二条消息中命中。
路由器:
public class CommandRouter : UntypedActor
{
protected readonly IActorResolver _resolver;
private static readonly Dictionary<Type, IActorRef> _routees = new Dictionary<Type, IActorRef>();
private ILoggingAdapter _log = Context.GetLogger(new SerilogLogMessageFormatter());
public CommandRouter(IActorResolver resolver)
{
_resolver = resolver;
}
protected override void OnReceive(object message)
{
_log.Info("Routing command {cmd}", message);
var typeKey = message.GetType();
if (!_routees.ContainsKey(typeKey))
{
var props = CreateActorProps(typeKey);
if (!props.Any())
{
Sender?.Tell(Response.WithException(
new RoutingException(
$"Could not route message to routee. No routees found for message type {typeKey.FullName}")));
return;
}
if (props.Count() > 1)
{
Sender?.Tell(Response.WithException(
new RoutingException(
$"Multiple routees registered for message {typeKey.FullName}, which is not supported by this router. Did you want to publish stuff instead?")));
return;
}
var prop = props.First();
var routee = Context.ActorOf(prop, prop.Type.Name);
_routees.Add(typeKey, routee);
}
_routees[typeKey].Forward(message);
}
private IEnumerable<Props> CreateActorProps(Type messageType)
{
return _resolver.TryCreateActorProps(typeof(IHandleCommand<>).MakeGenericType(messageType)).ToList();
}
protected override SupervisorStrategy SupervisorStrategy()
{
return new OneForOneStrategy(x => Directive.Restart);
}
}
ActorResolver 方法,它使用来自 Akka.DI.StructureMap:
的 DependencyResolverpublic IEnumerable<Props> TryCreateActorProps(Type actorType)
{
foreach (var type in _container.GetAllInstances(actorType))
{
yield return _resolver.Create(type.GetType());
}
}
真正的童星很直白:
public class ProductSubscriptionHandler : ReceiveActor, IHandleCommand<AddProductSubscription>
{
public ProductSubscriptionHandler()
{
Receive<AddProductSubscription>(Handle);
}
protected bool Handle(AddProductSubscription command)
{
Sender?.Tell(Response.Empty);
return true;
}
}
整个过程在 actor 系统初始化后调用,如下所示:
var router = Sys.ActorOf(resolver.Create<CommandRouter>(), ActorNames.CommandRouter);
router.Ask(new AddProductSubscription());
router.Ask(new AddProductSubscription());
我一直在第二条(或任何后续)消息中收到此错误:"Unhandled message from...":
[INFO][17-07-2017 23:05:39][Thread 0003][[akka://pos-system/user/CommandRouter#676182398]] Routing command Commands.AddProductSubscription
[DEBUG][17-07-2017 23:05:39][Thread 0003][akka://pos-system/user/CommandRouter] now supervising akka://pos-system/user/CommandRouter/ProductSubscriptionHandler
[DEBUG][17-07-2017 23:05:39][Thread 0003][akka://pos-system/user/CommandRouter] *Unhandled message from akka://pos-system/temp/d* : Documents.Commands.AddProductSubscription
[DEBUG][17-07-2017 23:05:39][Thread 0007][akka://pos-system/user/CommandRouter/ProductSubscriptionHandler] Started (Consumers.Service.Commands.ProductSubscriptionHandler)
所以,事实证明我的问题有一个更简单(并且有效)的解决方案:只需在 CommandRouter 构造函数中注册并启动所有路由 actor,而不是每次接收。
现在我的代码看起来也简单多了:
CommandRouter:
public class CommandRouterActor : UntypedActor
{
public Dictionary<Type, IActorRef> RoutingTable { get; }
private ILoggingAdapter _log = Context.GetLogger(new SerilogLogMessageFormatter());
public CommandRouterActor(IActorResolver resolver)
{
var props = resolver.CreateCommandHandlerProps();
RoutingTable = props.ToDictionary(k => k.Item1, v => Context.ActorOf(v.Item2, $"CommandHandler-{v.Item1.Name}"));
}
protected override void OnReceive(object message)
{
_log.Info("Routing command {cmd}", message);
var typeKey = message.GetType();
if (!RoutingTable.ContainsKey(typeKey))
{
Sender?.Tell(Response.WithException(
new RoutingException(
$"Could not route message to routee. No routees found for message type {typeKey.FullName}")));
_log.Info("Could not route command {cmd}, no routes found", message);
}
RoutingTable[typeKey].Forward(message);
}
protected override SupervisorStrategy SupervisorStrategy()
{
return new OneForOneStrategy(x => Directive.Restart);
}
}
而我的 ActorResolver(在上面的 ctor 中使用)只是 IHandleCommand<>
的
public IEnumerable<Tuple<Type, Props>> CreateCommandHandlerProps()
{
var handlerTypes =
_container.Model.AllInstances.Where(
i =>
i.PluginType.IsGenericType && i.PluginType.GetGenericTypeDefinition() ==
typeof(IHandleCommand<>))
.Select(m => m.PluginType);
foreach (var handler in handlerTypes)
{
yield return new Tuple<Type, Props>(handler.GenericTypeArguments.First(), _resolver.Create(handler));
}
}