如何在纯 C# 中制作队列消息代理
How to make a queued message broker in pure C#
背景
我需要一个排队消息代理以分布式(连续帧)方式调度消息。在下面显示的示例中,它将处理不超过 10 个订阅者,然后在进一步处理之前等待下一帧。
(为了向不熟悉 Unity3D 的人说明,Process()
方法是 运行 使用 Unity 的内置 StartCoroutine()
方法,并且 - 在这种情况下 - 将持续游戏的生命周期 - 从队列中等待或处理。)
所以我有这么一个比较简单的class:
public class MessageBus : IMessageBus
{
private const int LIMIT = 10;
private readonly WaitForSeconds Wait;
private Queue<IMessage> Messages;
private Dictionary<Type, List<Action<IMessage>>> Subscribers;
public MessageBus()
{
Wait = new WaitForSeconds(2f);
Messages = new Queue<IMessage>();
Subscribers = new Dictionary<Type, List<Action<IMessage>>>();
}
public void Submit(IMessage message)
{
Messages.Enqueue(message);
}
public IEnumerator Process()
{
var processed = 0;
while (true)
{
if (Messages.Count == 0)
{
yield return Wait;
}
else
{
while(Messages.Count > 0)
{
var message = Messages.Dequeue();
foreach (var subscriber in Subscribers[message.GetType()])
{
if (processed >= LIMIT)
{
processed = 0;
yield return null;
}
processed++;
subscriber?.Invoke(message);
}
}
processed = 0;
}
}
}
public void Subscribe<T>(Action<IMessage> handler) where T : IMessage
{
if (!Subscribers.ContainsKey(typeof(T)))
{
Subscribers[typeof(T)] = new List<Action<IMessage>>();
}
Subscribers[typeof(T)].Add(handler);
}
public void Unsubscribe<T>(Action<IMessage> handler) where T : IMessage
{
if (!Subscribers.ContainsKey(typeof(T)))
{
return;
}
Subscribers[typeof(T)].Remove(handler);
}
}
它的工作和行为符合预期,但有一个问题。
问题
我想这样使用它(从订阅者的角度来看):
public void Run()
{
MessageBus.Subscribe<TestEvent>(OnTestEvent);
}
public void OnTestEvent(TestEvent message)
{
message.SomeTestEventMethod();
}
但这显然失败了,因为Action<IMessage>
无法转换为Action<TestEvent>
。
我唯一可以使用它的方法是这样的:
public void Run()
{
MessageBus.Subscribe<TestEvent>(OnTestEvent);
}
public void OnTestEvent(IMessage message)
{
((TestEvent)message).SomeTestEventMethod();
}
但这感觉很不雅而且非常浪费,因为每个订阅者都需要自己进行转换。
我试过的
我正在尝试 "casting" 这样的操作:
public void Subscribe<T>(Action<T> handler) where T : IMessage
{
if (!Subscribers.ContainsKey(typeof(T)))
{
Subscribers[typeof(T)] = new List<Action<IMessage>>();
}
Subscribers[typeof(T)].Add((IMessage a) => handler((T)a));
}
这适用于 subscribe 部分,但显然不适用于 unsubscribe。我可以将新创建的 handler-wrapper-lambdas 缓存在某个地方,以便在取消订阅时使用,但老实说,我认为这不是真正的解决方案。
问题
我怎样才能让它像我想要的那样工作?如果可能,最好使用一些 C# "magic",但我知道它可能需要完全不同的方法。
也因为这将在游戏中使用,并且运行它的生命周期我想要一个无垃圾的解决方案。
所以问题是您正试图将不同类型的列表存储为订阅者字典中的值。
解决这个问题的一种方法可能是存储 List<Delegate>
然后使用 Delegate.DynamicInvoke
.
下面是一些总结要点的测试代码:
Dictionary<Type, List<Delegate>> Subscribers = new Dictionary<Type, List<Delegate>>();
void Main()
{
Subscribe<Evt>(ev => Console.WriteLine($"hello {ev.Message}"));
IMessage m = new Evt("spender");
foreach (var subscriber in Subscribers[m.GetType()])
{
subscriber?.DynamicInvoke(m);
}
}
public void Subscribe<T>(Action<T> handler) where T : IMessage
{
if (!Subscribers.ContainsKey(typeof(T)))
{
Subscribers[typeof(T)] = new List<Delegate>();
}
Subscribers[typeof(T)].Add(handler);
}
public interface IMessage{}
public class Evt : IMessage
{
public Evt(string message)
{
this.Message = message;
}
public string Message { get; }
}
背景
我需要一个排队消息代理以分布式(连续帧)方式调度消息。在下面显示的示例中,它将处理不超过 10 个订阅者,然后在进一步处理之前等待下一帧。
(为了向不熟悉 Unity3D 的人说明,Process()
方法是 运行 使用 Unity 的内置 StartCoroutine()
方法,并且 - 在这种情况下 - 将持续游戏的生命周期 - 从队列中等待或处理。)
所以我有这么一个比较简单的class:
public class MessageBus : IMessageBus
{
private const int LIMIT = 10;
private readonly WaitForSeconds Wait;
private Queue<IMessage> Messages;
private Dictionary<Type, List<Action<IMessage>>> Subscribers;
public MessageBus()
{
Wait = new WaitForSeconds(2f);
Messages = new Queue<IMessage>();
Subscribers = new Dictionary<Type, List<Action<IMessage>>>();
}
public void Submit(IMessage message)
{
Messages.Enqueue(message);
}
public IEnumerator Process()
{
var processed = 0;
while (true)
{
if (Messages.Count == 0)
{
yield return Wait;
}
else
{
while(Messages.Count > 0)
{
var message = Messages.Dequeue();
foreach (var subscriber in Subscribers[message.GetType()])
{
if (processed >= LIMIT)
{
processed = 0;
yield return null;
}
processed++;
subscriber?.Invoke(message);
}
}
processed = 0;
}
}
}
public void Subscribe<T>(Action<IMessage> handler) where T : IMessage
{
if (!Subscribers.ContainsKey(typeof(T)))
{
Subscribers[typeof(T)] = new List<Action<IMessage>>();
}
Subscribers[typeof(T)].Add(handler);
}
public void Unsubscribe<T>(Action<IMessage> handler) where T : IMessage
{
if (!Subscribers.ContainsKey(typeof(T)))
{
return;
}
Subscribers[typeof(T)].Remove(handler);
}
}
它的工作和行为符合预期,但有一个问题。
问题
我想这样使用它(从订阅者的角度来看):
public void Run()
{
MessageBus.Subscribe<TestEvent>(OnTestEvent);
}
public void OnTestEvent(TestEvent message)
{
message.SomeTestEventMethod();
}
但这显然失败了,因为Action<IMessage>
无法转换为Action<TestEvent>
。
我唯一可以使用它的方法是这样的:
public void Run()
{
MessageBus.Subscribe<TestEvent>(OnTestEvent);
}
public void OnTestEvent(IMessage message)
{
((TestEvent)message).SomeTestEventMethod();
}
但这感觉很不雅而且非常浪费,因为每个订阅者都需要自己进行转换。
我试过的
我正在尝试 "casting" 这样的操作:
public void Subscribe<T>(Action<T> handler) where T : IMessage
{
if (!Subscribers.ContainsKey(typeof(T)))
{
Subscribers[typeof(T)] = new List<Action<IMessage>>();
}
Subscribers[typeof(T)].Add((IMessage a) => handler((T)a));
}
这适用于 subscribe 部分,但显然不适用于 unsubscribe。我可以将新创建的 handler-wrapper-lambdas 缓存在某个地方,以便在取消订阅时使用,但老实说,我认为这不是真正的解决方案。
问题
我怎样才能让它像我想要的那样工作?如果可能,最好使用一些 C# "magic",但我知道它可能需要完全不同的方法。
也因为这将在游戏中使用,并且运行它的生命周期我想要一个无垃圾的解决方案。
所以问题是您正试图将不同类型的列表存储为订阅者字典中的值。
解决这个问题的一种方法可能是存储 List<Delegate>
然后使用 Delegate.DynamicInvoke
.
下面是一些总结要点的测试代码:
Dictionary<Type, List<Delegate>> Subscribers = new Dictionary<Type, List<Delegate>>();
void Main()
{
Subscribe<Evt>(ev => Console.WriteLine($"hello {ev.Message}"));
IMessage m = new Evt("spender");
foreach (var subscriber in Subscribers[m.GetType()])
{
subscriber?.DynamicInvoke(m);
}
}
public void Subscribe<T>(Action<T> handler) where T : IMessage
{
if (!Subscribers.ContainsKey(typeof(T)))
{
Subscribers[typeof(T)] = new List<Delegate>();
}
Subscribers[typeof(T)].Add(handler);
}
public interface IMessage{}
public class Evt : IMessage
{
public Evt(string message)
{
this.Message = message;
}
public string Message { get; }
}