C#异步处理N个事件
C# Handling N events asynchronously
所以交易是我有一个处理程序侦听队列,消息从另一个服务推送到它。我知道有多少消息 应该 到达队列,但我如何在测试中验证这一点?
假设我有一个这样的处理程序 seutp:
public class Handler<TMessage> : IHandleMessages<TMessage>
{
public TMessage Message { get; private set; }
public async Task Handle(TMessage message)
{
await Task.Run(() =>
{
Message = message;
})
.ConfigureAwait(continueOnCapturedContext: false);
}
}
所以每当 TMessage
被放入队列时,它就会被消耗。
现在我想要一些东西来验证我确实收到了我期望的消息数量,我尝试了以下操作:
public async Task VerifyReceivedMessages()
{
//I'm expecting 5 files (how and why is not relevant in this case)
const int numberOfMessages = 5;
int numberOfReceivedMessages = 0;
var receivedMessages = new List<TMessage>();
var handler = new Handler<TMessage>(new List<TMessage>());
while (numberOfReceivedMessages < numberOfMessages)
{
handler.WaitForMessage();
foreach (var message in handler.MessageList)
{
if (!receivedMessages.Contains(message))
{
receivedMessages.Add(message);
numberOfReceivedMessages = receivedMessages.Count;
}
}
}
//I rarely get to this one before the program terminates.
await SaveReceivedMessages(receivedMessages);
}
为了让它工作,我也扩展了处理程序,所以它现在包括一个等待消息的方法以及一个消息列表,每当它收到一个消息时它就会添加到该列表中:
public class Handler<TMessage> : IHandleMessages<TMessage>
{
public TMessage Message { get; private set; }
public List<TMessage> MessageList { get; set; }
public Handler(List<TMessage> messageList)
{
MessageList = messageList;
}
public async Task Handle(TMessage message)
{
await Task.Run(() =>
{
Message = message;
ReceivedMessages(Message);
})
.ConfigureAwait(continueOnCapturedContext: false);
}
private void ReceivedMessages(TMessage Message)
{
MessageList.Add(message);
}
public void WaitForMessage(int sleepMsCycle = 100, int sleepLimitMs = 60000)
{
long maxWait = DateTime.UtcNow.Ticks + (sleepLimitMs * 10000);
while(Message == null && DateTime.UtcNow.Ticks <= maxWait)
{
Thread.Sleep(sleepMsCycle);
}
long now = DateTime.UtcNow.Ticks;
if (now > maxWait)
{
//Some exception handling...
}
}
}
处理程序本身似乎可以工作。每当发布服务发送消息时,处理程序都会将其拾取。问题是,发布者可以在不同的时间间隔发送 5 条消息,有时很快,有时可能需要几秒钟。
目前,我想不出一个好的方法来接收 VerifyReceivedMessages()
中的消息。当前的解决方案有时只提取 1、2 或 3(对我来说似乎是随机的),让剩余的消息在队列中排队,然后在 foreach
.
的下一次迭代终止之前
有什么建议吗?
所以我有机会睡在这个上面,结果它不起作用的原因是因为我试图从列表中读取,处理程序在同一时间也可能访问foreach
。添加 .ToList()
可以,但更好的是,将其放入线程安全列表也可以解决问题。
public ConcurrentDictionary<TMessage, string> MessageList { get; set; }
public Handler(ConcurrentDictionary<TMessage, string> messageList)
{
MessageList = messageList;
}
...
private void ReceivedMessages(TMessage Message)
{
MessageList.TryAdd(message, "");
}
所以交易是我有一个处理程序侦听队列,消息从另一个服务推送到它。我知道有多少消息 应该 到达队列,但我如何在测试中验证这一点?
假设我有一个这样的处理程序 seutp:
public class Handler<TMessage> : IHandleMessages<TMessage>
{
public TMessage Message { get; private set; }
public async Task Handle(TMessage message)
{
await Task.Run(() =>
{
Message = message;
})
.ConfigureAwait(continueOnCapturedContext: false);
}
}
所以每当 TMessage
被放入队列时,它就会被消耗。
现在我想要一些东西来验证我确实收到了我期望的消息数量,我尝试了以下操作:
public async Task VerifyReceivedMessages()
{
//I'm expecting 5 files (how and why is not relevant in this case)
const int numberOfMessages = 5;
int numberOfReceivedMessages = 0;
var receivedMessages = new List<TMessage>();
var handler = new Handler<TMessage>(new List<TMessage>());
while (numberOfReceivedMessages < numberOfMessages)
{
handler.WaitForMessage();
foreach (var message in handler.MessageList)
{
if (!receivedMessages.Contains(message))
{
receivedMessages.Add(message);
numberOfReceivedMessages = receivedMessages.Count;
}
}
}
//I rarely get to this one before the program terminates.
await SaveReceivedMessages(receivedMessages);
}
为了让它工作,我也扩展了处理程序,所以它现在包括一个等待消息的方法以及一个消息列表,每当它收到一个消息时它就会添加到该列表中:
public class Handler<TMessage> : IHandleMessages<TMessage>
{
public TMessage Message { get; private set; }
public List<TMessage> MessageList { get; set; }
public Handler(List<TMessage> messageList)
{
MessageList = messageList;
}
public async Task Handle(TMessage message)
{
await Task.Run(() =>
{
Message = message;
ReceivedMessages(Message);
})
.ConfigureAwait(continueOnCapturedContext: false);
}
private void ReceivedMessages(TMessage Message)
{
MessageList.Add(message);
}
public void WaitForMessage(int sleepMsCycle = 100, int sleepLimitMs = 60000)
{
long maxWait = DateTime.UtcNow.Ticks + (sleepLimitMs * 10000);
while(Message == null && DateTime.UtcNow.Ticks <= maxWait)
{
Thread.Sleep(sleepMsCycle);
}
long now = DateTime.UtcNow.Ticks;
if (now > maxWait)
{
//Some exception handling...
}
}
}
处理程序本身似乎可以工作。每当发布服务发送消息时,处理程序都会将其拾取。问题是,发布者可以在不同的时间间隔发送 5 条消息,有时很快,有时可能需要几秒钟。
目前,我想不出一个好的方法来接收 VerifyReceivedMessages()
中的消息。当前的解决方案有时只提取 1、2 或 3(对我来说似乎是随机的),让剩余的消息在队列中排队,然后在 foreach
.
有什么建议吗?
所以我有机会睡在这个上面,结果它不起作用的原因是因为我试图从列表中读取,处理程序在同一时间也可能访问foreach
。添加 .ToList()
可以,但更好的是,将其放入线程安全列表也可以解决问题。
public ConcurrentDictionary<TMessage, string> MessageList { get; set; }
public Handler(ConcurrentDictionary<TMessage, string> messageList)
{
MessageList = messageList;
}
...
private void ReceivedMessages(TMessage Message)
{
MessageList.TryAdd(message, "");
}