C#并行添加到队列并监听队列传播

C# add to queue in parallel and listen for queue propagation

我是多线程编程的新手。我有一个程序需要查询数据库,然后对返回的数据执行一些数据操作。由于我组织的结构,我必须单独调用数据库以检索单个用户的帐户信息。我的任务涉及收集数千个帐户的数据。

目前,我正在使用 Parallel.ForEach() 查询数据库并将所有元素添加到 ConcurrentList 中。从数据库返回所有数据后,我将以同步方式执行操作。

除了任何明显的问题之外,我不喜欢的一件事是在内存中保留一个大列表并且基本上被阻止,直到冗长的数据库进程完成。我希望能够将数据推送到队列中,然后在添加数据后立即开始处理数据。消费过程不需要是并行的或异步的。我只需要它能够在队列中添加内容或队列不为空时进行监听。

并行进程:

public static ConcurrentBag<CombinedAccountInfo> GetAllAccountInfo(List<AccountInfo> accountList, string dbConnName)
    {
        logger.Info("Fetching Data");
        var concurrentCombinedData = new ConcurrentBag<CombinedAccountInfo>();
        Parallel.ForEach(accountList, new ParallelOptions { MaxDegreeOfParallelism = 5 }, r =>
        {
            try
            {
                var userPrefs = new List<UserPreference>().queryData(Queries.UserPrefQuery, dbConnName);

                concurrentCombinedData.Add(new CombinedAccountInfo()
                {
                    AccountName = r.AccountName,
                    AccountId = r.AccountId,
                    LastLoginDate = r.LastLoginDate,
                    AccountHandle = r.AccountHandle,
                    UserPreferences = userPrefs 
                });
            }
            catch (Exception e)
            {
                logger.Error(e);
            }
        });

        return concurrentCombinedTransaction;
    }

我阅读了一些关于 Dataflow 的文章,并看到了一些关于 Reactive Extensions 的文章。但是,我似乎可以找到多个生产者向单个消费者提供信息的更简单示例。对于如何更好地实现最终目标的任何建议或想法,我们将不胜感激。

已解决

我将使用 Scott Hannen 提供的答案。因为操作很小,而且不是很密集,所以每个进程都可以处理它,而不是试图将所有内容重新绑定到列表中。

如果您想要的是在从数据库中检索帐户时对每个帐户进行操作,那么您完全可以这样做,而不是将元素添加到 ConcurrentBag<CombinedAccountInfo>

public static ConcurrentBag<CombinedAccountInfo> GetAllAccountInfo(
    List<AccountInfo> accountList, 
    string dbConnName,
    Action<CombinedAccountInfo> doSomethingWithTheAccountInfo)

然后,当您从数据库中获取每个元素时,

doSomethingWithTheAccountInfo(accountInfo);

尽管我真的认为您应该一次查询所有用户首选项,因为这会提高数据库的性能(真的 BIG TIME),如果您想要类似这个:

public void Answer<T>(List<Guid> ids)
{
    var stack = new ConcurrentStack<T>();

    Parallel.ForEach(ids, (id) =>
    {
        T value = GetData<T>(id);

        stack.Push(value);
    });

    Parallel.For(0, ids.Count, (i) =>
    {
        T item;
        while (!stack.TryPop(out item))
        {
            // sleep
        }
        Process(item);
    });
}

但是我有没有提到,我认为你不应该去那里?