如何在纯 C# 中制作队列消息代理

How to make a queued message broker in pure C#

背景

我需要一个排队消息代理以分布式(连续帧)方式调度消息。在下面显示的示例中,它将处理不超过 10 个订阅者,然后在进一步处理之前等待下一帧。

(为了向不熟悉 Unity3D 的人说明,Process() 方法是 运行 使用 Unity 的内置 StartCoroutine() 方法,并且 - 在这种情况下 - 将持续游戏的生命周期 - 从队列中等待或处理。)

所以我有这么一个比较简单的class:

public class MessageBus : IMessageBus
{
    private const int LIMIT = 10;
    private readonly WaitForSeconds Wait;

    private Queue<IMessage> Messages;
    private Dictionary<Type, List<Action<IMessage>>> Subscribers;

    public MessageBus()
    {
        Wait = new WaitForSeconds(2f);
        Messages = new Queue<IMessage>();
        Subscribers = new Dictionary<Type, List<Action<IMessage>>>();
    }

    public void Submit(IMessage message)
    {
        Messages.Enqueue(message);
    }

    public IEnumerator Process()
    {
        var processed = 0;

        while (true)
        {
            if (Messages.Count == 0)
            {
                yield return Wait;
            }
            else
            {
                while(Messages.Count > 0)
                {
                    var message = Messages.Dequeue();

                    foreach (var subscriber in Subscribers[message.GetType()])
                    {
                        if (processed >= LIMIT)
                        {
                            processed = 0;
                            yield return null;
                        }

                        processed++;
                        subscriber?.Invoke(message);
                    }
                }

                processed = 0;
            }
        }
    }

    public void Subscribe<T>(Action<IMessage> handler) where T : IMessage
    {
        if (!Subscribers.ContainsKey(typeof(T)))
        {
            Subscribers[typeof(T)] = new List<Action<IMessage>>();
        }

        Subscribers[typeof(T)].Add(handler);
    }

    public void Unsubscribe<T>(Action<IMessage> handler) where T : IMessage
    {
        if (!Subscribers.ContainsKey(typeof(T)))
        {
            return;
        }

        Subscribers[typeof(T)].Remove(handler);
    }
}

它的工作和行为符合预期,但有一个问题。

问题

我想这样使用它(从订阅者的角度来看):

public void Run()
{
    MessageBus.Subscribe<TestEvent>(OnTestEvent);
}

public void OnTestEvent(TestEvent message)
{
    message.SomeTestEventMethod();
}

但这显然失败了,因为Action<IMessage>无法转换为Action<TestEvent>

我唯一可以使用它的方法是这样的:

public void Run()
{
    MessageBus.Subscribe<TestEvent>(OnTestEvent);
}

public void OnTestEvent(IMessage message)
{
    ((TestEvent)message).SomeTestEventMethod();
}

但这感觉很不雅而且非常浪费,因为每个订阅者都需要自己进行转换。

我试过的

我正在尝试 "casting" 这样的操作:

public void Subscribe<T>(Action<T> handler) where T : IMessage
{
    if (!Subscribers.ContainsKey(typeof(T)))
    {
        Subscribers[typeof(T)] = new List<Action<IMessage>>();
    }

    Subscribers[typeof(T)].Add((IMessage a) => handler((T)a));
}

这适用于 subscribe 部分,但显然不适用于 unsubscribe。我可以将新创建​​的 handler-wrapper-lambdas 缓存在某个地方,以便在取消订阅时使用,但老实说,我认为这不是真正的解决方案。

问题

我怎样才能让它像我想要的那样工作?如果可能,最好使用一些 C# "magic",但我知道它可能需要完全不同的方法。

因为这将在游戏中使用,并且运行它的生命周期我想要一个无垃圾的解决方案。

所以问题是您正试图将不同类型的列表存储为订阅者字典中的值。

解决这个问题的一种方法可能是存储 List<Delegate> 然后使用 Delegate.DynamicInvoke.

下面是一些总结要点的测试代码:

Dictionary<Type, List<Delegate>> Subscribers = new Dictionary<Type, List<Delegate>>();

void Main()
{
    Subscribe<Evt>(ev => Console.WriteLine($"hello {ev.Message}"));
    IMessage m = new Evt("spender");
    foreach (var subscriber in Subscribers[m.GetType()])
    {
        subscriber?.DynamicInvoke(m);
    }
}

public void Subscribe<T>(Action<T> handler) where T : IMessage
{
    if (!Subscribers.ContainsKey(typeof(T)))
    {
        Subscribers[typeof(T)] = new List<Delegate>();
    }
    Subscribers[typeof(T)].Add(handler);
}

public interface IMessage{}

public class Evt : IMessage
{
    public Evt(string message)
    {
        this.Message = message;
    }
    public string Message { get; }
}