如何在 CosmosDB SDK v3.0 异步查询中使用 LINQ?

How can I use LINQ in CosmosDB SDK v3.0 async query?

我一直在关注这里的官方文档: https://docs.microsoft.com/en-us/azure/cosmos-db/sql-api-get-started#Query

但我不知道如何在 SQL 字符串上正确使用 LINQ 表达式。我尝试了 GetItemLinqQueryable,但我不知道使用它是否正确。也不是异步的。

var db = Client.GetDatabase(databaseId);
var container = db.GetContainer(containerId);

var q = container.GetItemLinqQueryable<Person>();
var result = q.Where(p => p.Name == "Name").ToList();

这是将 LINQ 与 Cosmos v3 结合使用的正确方法吗?如何使其异步?

你会用 ToFeedIterator() and FeedIterator<T>.ReadNextAsync().

var db = Client.GetDatabase(databaseId);
var container = db.GetContainer(containerId);

var q = container.GetItemLinqQueryable<Person>();
var iterator = q.Where(p => p.Name == "Name").ToFeedIterator();
var results = await iterator.ReadNextAsync();

如果您的应用程序遵循分层架构并且您希望让您的域层完全控制查询,那么可以使用实现 IAsyncEnumerable 例如

通过这样做,您可以隐藏异步迭代域层结果的实现细节。

持久层

public class PersonRepository
{
    public IQueryable<Person> Persons => _cosmosContainer.GetItemLinqQueryable<Person>().ToCosmosAsyncQueryable();
}

领域层

var persons = await _personRepository.Persons
    .Where(p => p.Name == "Name")
    .AsAsyncQueryable()
    .ToListAsync(cancellationToken);
  • ToListAsync 可从 System.Linq.Async 获得,可从您的域层
  • 引用

领域层扩展

public static IAsyncEnumerable<T> AsAsyncQueryable<T>(this IQueryable<T> queryable)
{
    return (IAsyncEnumerable<T>)queryable;
}

持久层扩展

internal static class CosmosAsyncQueryableExtensions
{
    internal static IQueryable<T> ToCosmosAsyncQueryable<T>(this IOrderedQueryable<T> source)
    {
        return new CosmosAsyncQueryable<T>(source);
    }
}

internal class CosmosAsyncQueryable<TResult> : IEnumerable<TResult>, IQueryable<TResult>, IAsyncEnumerable<TResult>
{
    private readonly IQueryable<TResult> _queryable;

    public CosmosAsyncQueryable(IQueryable<TResult> queryable)
    {
        _queryable = queryable;
        Provider = new CosmosAsyncQueryableProvider(queryable.Provider);
    }

    public Type ElementType => typeof(TResult);

    public Expression Expression => _queryable.Expression;

    public IQueryProvider Provider { get; }

    public IEnumerator<TResult> GetEnumerator() => _queryable.GetEnumerator();

    IEnumerator IEnumerable.GetEnumerator() => _queryable.GetEnumerator();

    public async IAsyncEnumerator<TResult> GetAsyncEnumerator(CancellationToken cancellationToken = default)
    {
        var iterator = _queryable.ToFeedIterator();

        while (iterator.HasMoreResults)
        {
            foreach (var item in await iterator.ReadNextAsync(cancellationToken))
            {
                yield return item;
            }
        }
    }
}

internal class CosmosAsyncQueryableProvider : IQueryProvider
{
    private readonly IQueryProvider _provider;

    public CosmosAsyncQueryableProvider(IQueryProvider provider) => _provider = provider;

    public IQueryable<TElement> CreateQuery<TElement>(Expression expression) =>
        new CosmosAsyncQueryable<TElement>(_provider.CreateQuery<TElement>(expression));

    public IQueryable CreateQuery(Expression expression) => CreateQuery<object>(expression);

    public object Execute(Expression expression) => _provider.Execute(expression);

    public TResult Execute<TResult>(Expression expression) => _provider.Execute<TResult>(expression);
}