C#异步处理N个事件

C# Handling N events asynchronously

所以交易是我有一个处理程序侦听队列,消息从另一个服务推送到它。我知道有多少消息 应该 到达队列,但我如何在测试中验证这一点?

假设我有一个这样的处理程序 seutp:

public class Handler<TMessage> : IHandleMessages<TMessage>
{
    public TMessage Message { get; private set; }

    public async Task Handle(TMessage message)
    {
        await Task.Run(() =>
            {
                Message = message;
            })
            .ConfigureAwait(continueOnCapturedContext: false);
    }
}

所以每当 TMessage 被放入队列时,它就会被消耗。

现在我想要一些东西来验证我确实收到了我期望的消息数量,我尝试了以下操作:

public async Task VerifyReceivedMessages()
{
    //I'm expecting 5 files (how and why is not relevant in this case)
    const int numberOfMessages = 5;
    int numberOfReceivedMessages = 0;

    var receivedMessages = new List<TMessage>();

    var handler = new Handler<TMessage>(new List<TMessage>());

    while (numberOfReceivedMessages < numberOfMessages)
    {
        handler.WaitForMessage();
        foreach (var message in handler.MessageList)
        {
            if (!receivedMessages.Contains(message))
            {
                receivedMessages.Add(message);
                numberOfReceivedMessages = receivedMessages.Count;
            }
        }
    }
    //I rarely get to this one before the program terminates.
    await SaveReceivedMessages(receivedMessages);
}

为了让它工作,我也扩展了处理程序,所以它现在包括一个等待消息的方法以及一个消息列表,每当它收到一个消息时它就会添加到该列表中:

public class Handler<TMessage> : IHandleMessages<TMessage>
{
    public TMessage Message { get; private set; }
    public List<TMessage> MessageList { get; set; }

    public Handler(List<TMessage> messageList)
    {
        MessageList = messageList;
    }

    public async Task Handle(TMessage message)
    {
        await Task.Run(() =>
            {
                Message = message;
                ReceivedMessages(Message);
            })
            .ConfigureAwait(continueOnCapturedContext: false);
    }

    private void ReceivedMessages(TMessage Message)
    {
        MessageList.Add(message);
    }

    public void WaitForMessage(int sleepMsCycle = 100, int sleepLimitMs = 60000)
    {
        long maxWait = DateTime.UtcNow.Ticks + (sleepLimitMs * 10000);

        while(Message == null && DateTime.UtcNow.Ticks <= maxWait)
        {
            Thread.Sleep(sleepMsCycle);
        }

        long now = DateTime.UtcNow.Ticks;
        if (now > maxWait)
        {
            //Some exception handling...
        }
    }
}

处理程序本身似乎可以工作。每当发布服务发送消息时,处理程序都会将其拾取。问题是,发布者可以在不同的时间间隔发送 5 条消息,有时很快,有时可能需要几秒钟。

目前,我想不出一个好的方法来接收 VerifyReceivedMessages() 中的消息。当前的解决方案有时只提取 1、2 或 3(对我来说似乎是随机的),让剩余的消息在队列中排队,然后在 foreach.

的下一次迭代终止之前

有什么建议吗?

所以我有机会睡在这个上面,结果它不起作用的原因是因为我试图从列表中读取,处理程序在同一时间也可能访问foreach。添加 .ToList() 可以,但更好的是,将其放入线程安全列表也可以解决问题。

public ConcurrentDictionary<TMessage, string> MessageList { get; set; }

public Handler(ConcurrentDictionary<TMessage, string> messageList)
{
    MessageList = messageList;
}

...

private void ReceivedMessages(TMessage Message)
{
    MessageList.TryAdd(message, "");
}