如何使用 .NET Core 2.1 和 Stream 在 Cosmos DB 中批量插入 API
How to Bulk Insert in Cosmos DB with .NET Core 2.1 and Stream API
我正在尝试使用 this CosmosDB sample 实现批量插入。此示例使用 .NET Core 3.* 创建并支持 System.Text.Json.
当使用 CreateItemAsync 方法时,它完美地工作:
var concurrentTasks = new List<Task<ItemResponse<Notification>>>();
foreach (var entity in entities)
{
entity.Id = GenerateId(entity);
var requestOptions = new ItemRequestOptions();
requestOptions.EnableContentResponseOnWrite = false; // We don't need to get the entire body returend.
concurrentTasks.Add(Container.CreateItemAsync(entity, new PartitionKey(entity.UserId), requestOptions));
}
await Task.WhenAll(concurrentTasks);
但是,我正在尝试看看是否可以通过将数据直接流式传输到 CosmosDB 来减少 RU 的数量,希望 CosmosDB 不会因为反序列化 JSON 本身而向我收费。
我正在使用 .NET Core 2.1 和 Newtonsoft.Json。这是我的代码,没有 return 成功状态代码。响应头中的子状态码为“0”。
Notification[] notifications = entities.ToArray();
var itemsToInsert = new Dictionary<PartitionKey, Stream>();
foreach (var notification in notifications)
{
MemoryStream ms = new MemoryStream();
StreamWriter writer = new StreamWriter(ms);
JsonTextWriter jsonWriter = new JsonTextWriter(writer);
JsonSerializer ser = new JsonSerializer();
ser.Serialize(jsonWriter, notification);
await jsonWriter.FlushAsync();
await writer.FlushAsync();
itemsToInsert.Add(new PartitionKey(notification.UserId), ms);
}
List<Task> tasks = new List<Task>(notifications.Length);
foreach (KeyValuePair<PartitionKey, Stream> item in itemsToInsert)
{
tasks.Add(Container.CreateItemStreamAsync(item.Value, item.Key)
.ContinueWith((Task<ResponseMessage> task) =>
{
using (ResponseMessage response = task.Result)
{
if (!response.IsSuccessStatusCode)
{
Console.WriteLine($"Received {response.StatusCode} ({response.ErrorMessage}).");
}
else
{
}
}
}));
}
// Wait until all are done
await Task.WhenAll(tasks);
response.StatusCode: BadRequest
response.ErrorMessage: 空
我假设我没有以正确的方式序列化到流中。有人知道吗?
更新
我发现新的 System.Text.Json 包也实现了 .NET Standard 2.0,所以我从 NUget 安装了它。现在我可以复制前面提到的 Github 中的示例代码。
Notification[] notifications = entities.ToArray();
var itemsToInsert = new List<Tuple<PartitionKey, Stream>>();
foreach (var notification in notifications)
{
notification.id = $"{notification.UserId}:{Guid.NewGuid()}";
MemoryStream stream = new MemoryStream();
await JsonSerializer.SerializeAsync(stream, notification);
itemsToInsert.Add(new Tuple<PartitionKey, Stream>(new PartitionKey(notification.RoleId), stream));
}
List<Task> tasks = new List<Task>(notifications.Length);
foreach (var item in itemsToInsert)
{
tasks.Add(Container.CreateItemStreamAsync(item.Item2, item.Item1)
.ContinueWith((Task<ResponseMessage> task) =>
{
using (ResponseMessage response = task.Result)
{
if (!response.IsSuccessStatusCode)
{
Console.WriteLine($"Received {response.StatusCode} ({response.ErrorMessage}).");
}
else
{
}
}
}));
}
// Wait until all are done
await Task.WhenAll(tasks);
我仔细检查了 BulkInsert 是否已启用(否则第一种方法也不起作用)。仍然有 BadRequest 和 errorMessage 的 NULL。
我还检查了尽管有 BadRequest,但数据没有添加到容器中。
流似乎不可读。因此是错误的请求。
我会稍微修改 MemoryStream
的创建方式:
foreach (var notification in notifications)
{
itemsToInsert.Add(new PartitionKey(notification.UserId), new MemoryStream(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(notification))));
}
当然,我正在使用 Newtonsoft.json 进行 jsonConvert。
我发现了问题。
我已经使用以下选项设置了我的 Cosmos 上下文:
var cosmosSerializationOptions = new CosmosSerializationOptions();
cosmosSerializationOptions.PropertyNamingPolicy = CosmosPropertyNamingPolicy.CamelCase;
CosmosClientOptions cosmosClientOptions = new CosmosClientOptions();
cosmosClientOptions.SerializerOptions = cosmosSerializationOptions;
因此 CamelCase
约定。在我的第一个(工作)代码示例中,我会让 CosmosDB 上下文反序列化为 JSON。他会使用这个 CamelCase 约定进行序列化,所以我的 PartionKey UserId
会被序列化为 userId
.
但是,为了减少一些 RU,我将使用让我负责序列化的 CreateItemStreamAsync
。还有一个错误,我的 属性 定义如下:
public int UserId { get; set; }
所以他会被连载到json UserId: 1
.
然而,分区键定义为/userId
。因此,如果我添加 JsonPropertyName 属性,它将起作用:
[JsonPropertyName("userId")]
public int UserId { get; set; }
...如果只有一条错误消息会告诉我这一点就好了。
使用此 CreateItemStream
方法可节省约 3% RU。但是,随着时间的推移,我猜这会慢慢节省一些 RU。
我正在尝试使用 this CosmosDB sample 实现批量插入。此示例使用 .NET Core 3.* 创建并支持 System.Text.Json.
当使用 CreateItemAsync 方法时,它完美地工作:
var concurrentTasks = new List<Task<ItemResponse<Notification>>>();
foreach (var entity in entities)
{
entity.Id = GenerateId(entity);
var requestOptions = new ItemRequestOptions();
requestOptions.EnableContentResponseOnWrite = false; // We don't need to get the entire body returend.
concurrentTasks.Add(Container.CreateItemAsync(entity, new PartitionKey(entity.UserId), requestOptions));
}
await Task.WhenAll(concurrentTasks);
但是,我正在尝试看看是否可以通过将数据直接流式传输到 CosmosDB 来减少 RU 的数量,希望 CosmosDB 不会因为反序列化 JSON 本身而向我收费。
我正在使用 .NET Core 2.1 和 Newtonsoft.Json。这是我的代码,没有 return 成功状态代码。响应头中的子状态码为“0”。
Notification[] notifications = entities.ToArray();
var itemsToInsert = new Dictionary<PartitionKey, Stream>();
foreach (var notification in notifications)
{
MemoryStream ms = new MemoryStream();
StreamWriter writer = new StreamWriter(ms);
JsonTextWriter jsonWriter = new JsonTextWriter(writer);
JsonSerializer ser = new JsonSerializer();
ser.Serialize(jsonWriter, notification);
await jsonWriter.FlushAsync();
await writer.FlushAsync();
itemsToInsert.Add(new PartitionKey(notification.UserId), ms);
}
List<Task> tasks = new List<Task>(notifications.Length);
foreach (KeyValuePair<PartitionKey, Stream> item in itemsToInsert)
{
tasks.Add(Container.CreateItemStreamAsync(item.Value, item.Key)
.ContinueWith((Task<ResponseMessage> task) =>
{
using (ResponseMessage response = task.Result)
{
if (!response.IsSuccessStatusCode)
{
Console.WriteLine($"Received {response.StatusCode} ({response.ErrorMessage}).");
}
else
{
}
}
}));
}
// Wait until all are done
await Task.WhenAll(tasks);
response.StatusCode: BadRequest response.ErrorMessage: 空
我假设我没有以正确的方式序列化到流中。有人知道吗?
更新
我发现新的 System.Text.Json 包也实现了 .NET Standard 2.0,所以我从 NUget 安装了它。现在我可以复制前面提到的 Github 中的示例代码。
Notification[] notifications = entities.ToArray();
var itemsToInsert = new List<Tuple<PartitionKey, Stream>>();
foreach (var notification in notifications)
{
notification.id = $"{notification.UserId}:{Guid.NewGuid()}";
MemoryStream stream = new MemoryStream();
await JsonSerializer.SerializeAsync(stream, notification);
itemsToInsert.Add(new Tuple<PartitionKey, Stream>(new PartitionKey(notification.RoleId), stream));
}
List<Task> tasks = new List<Task>(notifications.Length);
foreach (var item in itemsToInsert)
{
tasks.Add(Container.CreateItemStreamAsync(item.Item2, item.Item1)
.ContinueWith((Task<ResponseMessage> task) =>
{
using (ResponseMessage response = task.Result)
{
if (!response.IsSuccessStatusCode)
{
Console.WriteLine($"Received {response.StatusCode} ({response.ErrorMessage}).");
}
else
{
}
}
}));
}
// Wait until all are done
await Task.WhenAll(tasks);
我仔细检查了 BulkInsert 是否已启用(否则第一种方法也不起作用)。仍然有 BadRequest 和 errorMessage 的 NULL。
我还检查了尽管有 BadRequest,但数据没有添加到容器中。
流似乎不可读。因此是错误的请求。
我会稍微修改 MemoryStream
的创建方式:
foreach (var notification in notifications)
{
itemsToInsert.Add(new PartitionKey(notification.UserId), new MemoryStream(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(notification))));
}
当然,我正在使用 Newtonsoft.json 进行 jsonConvert。
我发现了问题。
我已经使用以下选项设置了我的 Cosmos 上下文:
var cosmosSerializationOptions = new CosmosSerializationOptions();
cosmosSerializationOptions.PropertyNamingPolicy = CosmosPropertyNamingPolicy.CamelCase;
CosmosClientOptions cosmosClientOptions = new CosmosClientOptions();
cosmosClientOptions.SerializerOptions = cosmosSerializationOptions;
因此 CamelCase
约定。在我的第一个(工作)代码示例中,我会让 CosmosDB 上下文反序列化为 JSON。他会使用这个 CamelCase 约定进行序列化,所以我的 PartionKey UserId
会被序列化为 userId
.
但是,为了减少一些 RU,我将使用让我负责序列化的 CreateItemStreamAsync
。还有一个错误,我的 属性 定义如下:
public int UserId { get; set; }
所以他会被连载到json UserId: 1
.
然而,分区键定义为/userId
。因此,如果我添加 JsonPropertyName 属性,它将起作用:
[JsonPropertyName("userId")]
public int UserId { get; set; }
...如果只有一条错误消息会告诉我这一点就好了。
使用此 CreateItemStream
方法可节省约 3% RU。但是,随着时间的推移,我猜这会慢慢节省一些 RU。