Akka.net actor 无法处理低延迟消息?

Akka.net actor can't handle low latency messages?

我有这个work-flow:

  1. 里面的一个方法parent

            Receive<UpdatePositionCmd>(cmd =>
        {                
            cmd.Qty = 150;
            cmd.Price = 100;
            _positionCoordinatorActor.Tell(cmd);
    
            Thread.Sleep(30);
    
            cmd.Qty = 250;
            cmd.Price = 200;
            _positionCoordinatorActor.Tell(cmd);
    
            Thread.Sleep(30);
    
            cmd.Qty = 133;
            cmd.Price = 300;
            _positionCoordinatorActor.Tell(cmd);
        });
    
  2. 位置协调器中的方法,它从#1 接收消息,找到合适的 child 并转发消息:

            Command<UpdatePositionCmd>(cmd =>
        {
            var child = LookupChild(cmd.PositionName);
            if (child != ActorRefs.Nobody)
            {
                child.Tell(cmd);
            }
            else
            {
                var @event = new PositionUpdatedEvent(cmd);
    
                Persist(@event, positionUpdatedEvent =>
                {
                    var childActor = Context.ActorOf(Props.Create(() => new PositionActor()), cmd.PositionName);
                    childActor.Tell(cmd);
                });
            }
        });
    
  3. child 中获取转发消息的方法:

            Command<UpdatePositionCmd>(cmd =>
        {
            Console.Write($"\nCmd Qty: {cmd.Qty}");
            Qty += cmd.Qty;
        });
    

所以我的问题:"Tells" 之间的 Thread.Sleep(30) 我得到正确的输出:

命令数量:150
命令数量:250
命令数量:133
总数量:533

但是我应该删除或减少 Thread.Sleep(30) 我得到的结果很乱,基本上它只读取了最后一条消息,但读取了三遍:

命令数量:133
命令数量:133
命令数量:133
总数量:399

请帮忙。谢谢!

A method inside parent 收到一个 UpdatePositionCmd 实例,将其传递给协调器,然后 更新这个相同的命令 。随着延迟,这不太明显,因为命令是在第一次处理完成后第二次发出的。

您应该为每次调用 _positionCoordinatorActor.Tell 创建一个新实例,并使 UpdatePositionCmd 不可变,这样您就不会无意中更改已经发送的实例。