C#并行添加到队列并监听队列传播
C# add to queue in parallel and listen for queue propagation
我是多线程编程的新手。我有一个程序需要查询数据库,然后对返回的数据执行一些数据操作。由于我组织的结构,我必须单独调用数据库以检索单个用户的帐户信息。我的任务涉及收集数千个帐户的数据。
目前,我正在使用 Parallel.ForEach() 查询数据库并将所有元素添加到 ConcurrentList 中。从数据库返回所有数据后,我将以同步方式执行操作。
除了任何明显的问题之外,我不喜欢的一件事是在内存中保留一个大列表并且基本上被阻止,直到冗长的数据库进程完成。我希望能够将数据推送到队列中,然后在添加数据后立即开始处理数据。消费过程不需要是并行的或异步的。我只需要它能够在队列中添加内容或队列不为空时进行监听。
并行进程:
public static ConcurrentBag<CombinedAccountInfo> GetAllAccountInfo(List<AccountInfo> accountList, string dbConnName)
{
logger.Info("Fetching Data");
var concurrentCombinedData = new ConcurrentBag<CombinedAccountInfo>();
Parallel.ForEach(accountList, new ParallelOptions { MaxDegreeOfParallelism = 5 }, r =>
{
try
{
var userPrefs = new List<UserPreference>().queryData(Queries.UserPrefQuery, dbConnName);
concurrentCombinedData.Add(new CombinedAccountInfo()
{
AccountName = r.AccountName,
AccountId = r.AccountId,
LastLoginDate = r.LastLoginDate,
AccountHandle = r.AccountHandle,
UserPreferences = userPrefs
});
}
catch (Exception e)
{
logger.Error(e);
}
});
return concurrentCombinedTransaction;
}
我阅读了一些关于 Dataflow 的文章,并看到了一些关于 Reactive Extensions 的文章。但是,我似乎可以找到多个生产者向单个消费者提供信息的更简单示例。对于如何更好地实现最终目标的任何建议或想法,我们将不胜感激。
已解决
我将使用 Scott Hannen 提供的答案。因为操作很小,而且不是很密集,所以每个进程都可以处理它,而不是试图将所有内容重新绑定到列表中。
如果您想要的是在从数据库中检索帐户时对每个帐户进行操作,那么您完全可以这样做,而不是将元素添加到 ConcurrentBag<CombinedAccountInfo>
。
public static ConcurrentBag<CombinedAccountInfo> GetAllAccountInfo(
List<AccountInfo> accountList,
string dbConnName,
Action<CombinedAccountInfo> doSomethingWithTheAccountInfo)
然后,当您从数据库中获取每个元素时,
doSomethingWithTheAccountInfo(accountInfo);
尽管我真的认为您应该一次查询所有用户首选项,因为这会提高数据库的性能(真的 BIG TIME),如果您想要类似这个:
public void Answer<T>(List<Guid> ids)
{
var stack = new ConcurrentStack<T>();
Parallel.ForEach(ids, (id) =>
{
T value = GetData<T>(id);
stack.Push(value);
});
Parallel.For(0, ids.Count, (i) =>
{
T item;
while (!stack.TryPop(out item))
{
// sleep
}
Process(item);
});
}
但是我有没有提到,我认为你不应该去那里?
我是多线程编程的新手。我有一个程序需要查询数据库,然后对返回的数据执行一些数据操作。由于我组织的结构,我必须单独调用数据库以检索单个用户的帐户信息。我的任务涉及收集数千个帐户的数据。
目前,我正在使用 Parallel.ForEach() 查询数据库并将所有元素添加到 ConcurrentList 中。从数据库返回所有数据后,我将以同步方式执行操作。
除了任何明显的问题之外,我不喜欢的一件事是在内存中保留一个大列表并且基本上被阻止,直到冗长的数据库进程完成。我希望能够将数据推送到队列中,然后在添加数据后立即开始处理数据。消费过程不需要是并行的或异步的。我只需要它能够在队列中添加内容或队列不为空时进行监听。
并行进程:
public static ConcurrentBag<CombinedAccountInfo> GetAllAccountInfo(List<AccountInfo> accountList, string dbConnName)
{
logger.Info("Fetching Data");
var concurrentCombinedData = new ConcurrentBag<CombinedAccountInfo>();
Parallel.ForEach(accountList, new ParallelOptions { MaxDegreeOfParallelism = 5 }, r =>
{
try
{
var userPrefs = new List<UserPreference>().queryData(Queries.UserPrefQuery, dbConnName);
concurrentCombinedData.Add(new CombinedAccountInfo()
{
AccountName = r.AccountName,
AccountId = r.AccountId,
LastLoginDate = r.LastLoginDate,
AccountHandle = r.AccountHandle,
UserPreferences = userPrefs
});
}
catch (Exception e)
{
logger.Error(e);
}
});
return concurrentCombinedTransaction;
}
我阅读了一些关于 Dataflow 的文章,并看到了一些关于 Reactive Extensions 的文章。但是,我似乎可以找到多个生产者向单个消费者提供信息的更简单示例。对于如何更好地实现最终目标的任何建议或想法,我们将不胜感激。
已解决
我将使用 Scott Hannen 提供的答案。因为操作很小,而且不是很密集,所以每个进程都可以处理它,而不是试图将所有内容重新绑定到列表中。
如果您想要的是在从数据库中检索帐户时对每个帐户进行操作,那么您完全可以这样做,而不是将元素添加到 ConcurrentBag<CombinedAccountInfo>
。
public static ConcurrentBag<CombinedAccountInfo> GetAllAccountInfo(
List<AccountInfo> accountList,
string dbConnName,
Action<CombinedAccountInfo> doSomethingWithTheAccountInfo)
然后,当您从数据库中获取每个元素时,
doSomethingWithTheAccountInfo(accountInfo);
尽管我真的认为您应该一次查询所有用户首选项,因为这会提高数据库的性能(真的 BIG TIME),如果您想要类似这个:
public void Answer<T>(List<Guid> ids)
{
var stack = new ConcurrentStack<T>();
Parallel.ForEach(ids, (id) =>
{
T value = GetData<T>(id);
stack.Push(value);
});
Parallel.For(0, ids.Count, (i) =>
{
T item;
while (!stack.TryPop(out item))
{
// sleep
}
Process(item);
});
}
但是我有没有提到,我认为你不应该去那里?