如何使用 .NET Core 2.1 和 Stream 在 Cosmos DB 中批量插入 API

How to Bulk Insert in Cosmos DB with .NET Core 2.1 and Stream API

我正在尝试使用 this CosmosDB sample 实现批量插入。此示例使用 .NET Core 3.* 创建并支持 System.Text.Json.

当使用 CreateItemAsync 方法时,它完美地工作:

    var concurrentTasks = new List<Task<ItemResponse<Notification>>>();
    foreach (var entity in entities)
    {
        entity.Id = GenerateId(entity);

        var requestOptions = new ItemRequestOptions();
        requestOptions.EnableContentResponseOnWrite = false; // We don't need to get the entire body returend.
        concurrentTasks.Add(Container.CreateItemAsync(entity, new PartitionKey(entity.UserId), requestOptions));
    }

    await Task.WhenAll(concurrentTasks);

但是,我正在尝试看看是否可以通过将数据直接流式传输到 CosmosDB 来减少 RU 的数量,希望 CosmosDB 不会因为反序列化 JSON 本身而向我收费。

我正在使用 .NET Core 2.1 和 Newtonsoft.Json。这是我的代码,没有 return 成功状态代码。响应头中的子状态码为“0”。

    Notification[] notifications = entities.ToArray();
    var itemsToInsert = new Dictionary<PartitionKey, Stream>();

    foreach (var notification in notifications)
    {
        MemoryStream ms = new MemoryStream();
        StreamWriter writer = new StreamWriter(ms);
        JsonTextWriter jsonWriter = new JsonTextWriter(writer);
        JsonSerializer ser = new JsonSerializer();
                
        ser.Serialize(jsonWriter, notification);

        await jsonWriter.FlushAsync();
        await writer.FlushAsync();

        itemsToInsert.Add(new PartitionKey(notification.UserId), ms);
    }

    List<Task> tasks = new List<Task>(notifications.Length);
    foreach (KeyValuePair<PartitionKey, Stream> item in itemsToInsert)
    {
        tasks.Add(Container.CreateItemStreamAsync(item.Value, item.Key)
            .ContinueWith((Task<ResponseMessage> task) =>
            {
                using (ResponseMessage response = task.Result)
                {
                    if (!response.IsSuccessStatusCode)
                    {
                        Console.WriteLine($"Received {response.StatusCode} ({response.ErrorMessage}).");
                    }
                    else
                    {
                    }
                }
            }));
    }

    // Wait until all are done
    await Task.WhenAll(tasks);

response.StatusCode: BadRequest response.ErrorMessage: 空

我假设我没有以正确的方式序列化到流中。有人知道吗?

更新

我发现新的 System.Text.Json 包也实现了 .NET Standard 2.0,所以我从 NUget 安装了它。现在我可以复制前面提到的 Github 中的示例代码。

        Notification[] notifications = entities.ToArray();
        var itemsToInsert = new List<Tuple<PartitionKey, Stream>>();

        foreach (var notification in notifications)
        {
            notification.id = $"{notification.UserId}:{Guid.NewGuid()}";

            MemoryStream stream = new MemoryStream();
            await JsonSerializer.SerializeAsync(stream, notification);

            itemsToInsert.Add(new Tuple<PartitionKey, Stream>(new PartitionKey(notification.RoleId), stream));
        }

        List<Task> tasks = new List<Task>(notifications.Length);
        foreach (var item in itemsToInsert)
        {
            tasks.Add(Container.CreateItemStreamAsync(item.Item2, item.Item1)
                .ContinueWith((Task<ResponseMessage> task) =>
                {
                    using (ResponseMessage response = task.Result)
                    {
                        if (!response.IsSuccessStatusCode)
                        {
                            Console.WriteLine($"Received {response.StatusCode} ({response.ErrorMessage}).");
                        }
                        else
                        {
                        }
                    }
                }));
        }

        // Wait until all are done
        await Task.WhenAll(tasks);

我仔细检查了 BulkInsert 是否已启用(否则第一种方法也不起作用)。仍然有 BadRequest 和 errorMessage 的 NULL。

我还检查了尽管有 BadRequest,但数据没有添加到容器中。

流似乎不可读。因此是错误的请求。 我会稍微修改 MemoryStream 的创建方式:

foreach (var notification in notifications)
    {
        
        itemsToInsert.Add(new PartitionKey(notification.UserId), new MemoryStream(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(notification))));
    }

当然,我正在使用 Newtonsoft.json 进行 jsonConvert。

我发现了问题。

我已经使用以下选项设置了我的 Cosmos 上下文:

var cosmosSerializationOptions = new CosmosSerializationOptions();
cosmosSerializationOptions.PropertyNamingPolicy = CosmosPropertyNamingPolicy.CamelCase;

CosmosClientOptions cosmosClientOptions = new CosmosClientOptions();
cosmosClientOptions.SerializerOptions = cosmosSerializationOptions;

因此 CamelCase 约定。在我的第一个(工作)代码示例中,我会让 CosmosDB 上下文反序列化为 JSON。他会使用这个 CamelCase 约定进行序列化,所以我的 PartionKey UserId 会被序列化为 userId.

但是,为了减少一些 RU,我将使用让我负责序列化的 CreateItemStreamAsync。还有一个错误,我的 属性 定义如下:

public int UserId { get; set; }

所以他会被连载到json UserId: 1.

然而,分区键定义为/userId。因此,如果我添加 JsonPropertyName 属性,它将起作用:

[JsonPropertyName("userId")]
public int UserId { get; set; } 

...如果只有一条错误消息会告诉我这一点就好了。

使用此 CreateItemStream 方法可节省约 3% RU。但是,随着时间的推移,我猜这会慢慢节省一些 RU。