如何从找到匹配记录的第一次调用中进行多个异步调用和 return 数据,然后停止剩余的异步调用

How to make multiple async calls and return data from first call that finds matching record then stop remaining async calls

我需要通过遍历连接字符串来进行多个数据库调用。数据库中只有 1 条匹配记录,如果我找到匹配记录,那么我可以 return 数据并取消其他异步调用。

using (var Contexts = instContextfactory.GetContextList())
{
    foreach(var context in Contexts.GetContextList())
    {    
        // how do I make all the calls and return data from the first call that finds data and continue with further process.(don't care about other calls if any single call finds data.           
        context.Insurance.GetInsuranceByANI(ani);
    }
}

通过 ANI 获取保险

public Task<IEnumerable<Insurance>> GetInsuranceByANI(string ani)
{
    using (ITransaction transaction = Session.Value.BeginTransaction())
    {
        transaction.Rollback();
        IDbCommand command = new SqlCommand();
        command.Connection = Session.Value.Connection;

        transaction.Enlist(command);

        string storedProcName = "spGetInsurance";

        command.CommandText = storedProcName;
        command.Parameters.Add(new SqlParameter("@ANI", SqlDbType.Char, 0, ParameterDirection.Input, false, 0, 0, null, DataRowVersion.Default, ani));

        var rdr = command.ExecuteReader();
        return Task.FromResult(MapInsurance(rdr));
    }
}

例如:我正在循环使用 5(a、b、c、d、e)个不同的数据库连接字符串。我需要对所有 5 个数据库进行异步调用。如果我在 db : b 中找到匹配的记录,那么我可以 return 该数据并继续下一步,并且可以停止调用其他 dbs

获取值后立即返回如何。不允许流程向前移动并打破循环。

using (var Contexts = instContextfactory.GetContextList())
    {
           foreach(var context in Contexts.GetContextList())
           {    
               // how do I make all the calls and return data from the first call that finds data and continue with the further process.(don't care about other calls if any single call finds data.           
                var result = await context.Insurance.GetInsuranceByANI(ani);

                if(result.Any())
                {
                    return result.First();
                }
           }
    }

为简单起见,您应该先将 GetInsuranceByANI 方法改回同步状态。稍后我们将生成任务以异步调用它。

public IEnumerable<Insurance> GetInsuranceByANI(string ani)
{
    using (ITransaction transaction = Session.Value.BeginTransaction())
    {
        transaction.Rollback();
        IDbCommand command = new SqlCommand();
        command.Connection = Session.Value.Connection;

        transaction.Enlist(command);

        string storedProcName = "spGetInsurance";

        command.CommandText = storedProcName;
        command.Parameters.Add(new SqlParameter("@ANI", SqlDbType.Char, 0, ParameterDirection.Input, false, 0, 0, null, DataRowVersion.Default, ani));

        var rdr = command.ExecuteReader();
        return MapInsurance(rdr);
    }
}

现在实现异步搜索所有数据库的方法。我们将为每个数据库创建一个任务,运行 在 thread-pool 线程中。这是值得商榷的,但我们正在努力让事情变得简单。我们还实例化了一个 CancellationTokenSource,并将其 Token 传递给所有 Task.Run 方法。这只会确保在我们得到结果后,不会再启动更多任务。如果 thread-pool 中的可用线程多于要搜索的数据库,则所有任务将立即开始,取消令牌实际上不会取消任何内容。换句话说,无论如何,所有启动的查询都将完成。这显然是一种资源浪费,但我们再次努力让事情变得简单。

开始任务后,我们将进入等待下一个任务完成的循环(使用方法 Task.WhenAny)。如果找到结果,我们取消令牌并 return 结果。如果未找到结果,我们将继续循环以获得下一个结果。如果所有任务都完成但我们仍然没有结果,我们 return null.

async Task<IEnumerable<Insurance>> SearchAllByANI(string ani)
{
    var tasks = new HashSet<Task<IEnumerable<Insurance>>>();
    var cts = new CancellationTokenSource();
    using (var Contexts = instContextfactory.GetContextList())
    {
        foreach (var context in Contexts.GetContextList())
        {
            tasks.Add(Task.Run(() =>
            {
                return context.Insurance.GetInsuranceByANI(ani);
            }, cts.Token));
        }
    }
    while (tasks.Count > 0)
    {
        var task = await Task.WhenAny(tasks);
        var result = await task;
        if (result != null && result.Any())
        {
            cts.Cancel();
            return result;
        }
        tasks.Remove(task);
    }
    return null;
}

用法示例:

IEnumerable<Insurance> result = await SearchAllByANI("12345");
if (result == null)
{
    // Nothing fould
}
else
{
    // Do something with result
}