使用 .Net 驱动程序异步更新或插入 MongoDB 文档
Async update or insert MongoDB documents using .Net driver
我有这样的文档
public class SomeDocument
{
public Guid Id { get; set; }
public string PropertyA { get; set; }
public string PropertyB { get; set; }
}
现在我有两个不同的服务(A 和 B),可以适当地更新 PropertyA 和 PropertyB 并以异步方式工作。这意味着我不知道哪个服务将首先完成并且应该创建文档以及谁应该更新它。
因此,为了更新(或创建)文档,我目前在服务 A 中使用这样的代码
var filter = new FilterDefinitionBuilder<SomeDocument>().Where(r => r.Id == id);
var options = new FindOneAndUpdateOptions<SomeDocument, SomeDocument>() { IsUpsert = true };
var update = Builders<SomeDocument>.Update.Set(r => r.PropertyA, "Property A value");
await Database.GetCollection<SomeDocument>("someDocuments").FindOneAndUpdateAsync(filter, update, options);
以及来自服务 B 的下一个代码
var filter = new FilterDefinitionBuilder<SomeDocument>().Where(r => r.Id == id);
var options = new FindOneAndUpdateOptions<SomeDocument, SomeDocument>() { IsUpsert = true };
var update = Builders<SomeDocument>.Update.Set(r => r.PropertyB, "Property B value");
await Database.GetCollection<SomeDocument>("someDocuments").FindOneAndUpdateAsync(filter, update, options);
一切看起来都很好,但有时当两个服务同时工作时我会收到下一个错误
Unhandled Exception: MongoDB.Driver.MongoCommandException: Command findAndModify failed: E11000 duplicate key error collection: someDocuments index: _id_ dup key: { : BinData(3, B85ED193195A274DA94BC86B655B4509) }.
at MongoDB.Driver.Core.WireProtocol.CommandWireProtocol`1.ProcessReply(ConnectionId connectionId, ReplyMessage`1 reply)
at MongoDB.Driver.Core.WireProtocol.CommandWireProtocol`1.<ExecuteAsync>d__11.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at MongoDB.Driver.Core.Servers.Server.ServerChannel.<ExecuteProtocolAsync>d__26`1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at MongoDB.Driver.Core.Operations.CommandOperationBase`1.<ExecuteProtocolAsync>d__29.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at MongoDB.Driver.Core.Operations.WriteCommandOperation`1.<ExecuteAsync>d__2.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at MongoDB.Driver.Core.Operations.FindAndModifyOperationBase`1.<ExecuteAsync>d__19.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at MongoDB.Driver.OperationExecutor.<ExecuteWriteOperationAsync>d__3`1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at MongoDB.Driver.MongoCollectionImpl`1.<ExecuteWriteOperationAsync>d__62`1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.TaskAwaiter`1.GetResult()
at CVSP.MongoDbStore.MongoDbWriteModelFacade.<AddRecordField>d__6.MoveNext() in D:\Projects\Test\Source\MongoDbStore\WriteModel\MongoDbWriteModelFacade.cs:line 58
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.AsyncMethodBuilderCore.<>c.<ThrowAsync>b__6_1(Object state)
at System.Threading.QueueUserWorkItemCallback.WaitCallback_Context(Object state)
at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
at System.Threading.QueueUserWorkItemCallback.System.Threading.IThreadPoolWorkItem.ExecuteWorkItem()
at System.Threading.ThreadPoolWorkQueue.Dispatch()
at System.Threading._ThreadPoolWaitCallback.PerformWaitCallback()
在这种情况下我应该如何insert/update文件?
更新
扩展成功了
public static async Task<TProjection> FindOneAndUpdateWithConcurrencyAsync<TDocument, TProjection>(this IMongoCollection<TDocument> collection, FilterDefinition<TDocument> filter, UpdateDefinition<TDocument> update, FindOneAndUpdateOptions<TDocument, TProjection> options = null, CancellationToken cancellationToken = default(CancellationToken))
{
try
{
return await collection.FindOneAndUpdateAsync(filter, update, options, cancellationToken);
}
catch (MongoException ex)
{
Thread.Sleep(10);
return await collection.FindOneAndUpdateAsync(filter, update, options, cancellationToken);
}
}
使用 try/catch 起初龟头看起来很奇怪,我从一开始就不喜欢它,但在阅读后 https://docs.mongodb.com/manual/reference/method/db.collection.findAndModify/#upsert-and-unique-index 所有疑虑都消失了。
好吧,这是同步问题,不幸的是没有简单的解决方案。为了找到 hack,让我们剖析后端可能发生的事情。
假设我们有两个线程(服务)试图更新一个文档。
t1: 00:00:00.250 -> find document with Id (1)
t2: 00:00:00.255 -> find document with id (1)
t1: 00:00:00.260 -> No document found
t2: 00:00:00.262 -> No document found
t1: 00:00:00.300 -> Insert a document with Id(1)
t2: 00:00:00.300 -> Insert a document with Id(1)
宾果......我们有例外。两个线程都试图插入具有相同 ID 的文档。
没有我们可以在这里做什么?
让我们把这次短暂的交流变成我们的优势。捕获此异常并再次尝试调用 upsert。这次,它会成功找到文档并更新它。
我修改了 ServiceA
和 ServiceB
的代码如下,并尝试在紧密循环中插入 10000 个文档:
public async Task ServiceA(Guid id)
{
var filter = new FilterDefinitionBuilder<SomeDocument>().Where(r => r.Id == id);
var update = Builders<SomeDocument>.Update.Set(r => r.PropertyA, "Property A value");
var options = new UpdateOptions() { IsUpsert = true };
var database = _client.GetDatabase("Whosebug");
var collection = database.GetCollection<SomeDocument>(CollectionName,
new MongoCollectionSettings
{
WriteConcern = WriteConcern.W1
});
await collection.UpdateOneAsync(filter, update, options);
}
public async Task ServiceB(Guid id)
{
var filter = new FilterDefinitionBuilder<SomeDocument>().Where(r => r.Id == id);
var update = Builders<SomeDocument>.Update.Set(r => r.PropertyB, "Property B value");
var options = new UpdateOptions() { IsUpsert = true };
var database = _client.GetDatabase("Whosebug");
var collection = database.GetCollection<SomeDocument>(CollectionName,
new MongoCollectionSettings
{
WriteConcern = WriteConcern.W1
});
await collection.UpdateOneAsync(filter, update, options);
}
这是我的启动代码。不完美但有用。
for (var i = 0; i < 10000; i++)
{
var _guid = Guid.NewGuid();
var _tasks = new[]
{
new Task(async (x) =>
{
var p = new Program();
try
{
await p.ServiceA(Guid.Parse(x.ToString()));
}
catch (MongoWriteException me)
{
await Task.Delay(5);
await p.ServiceA(Guid.Parse(x.ToString()));
}
}, _guid),
new Task(async (x) =>
{
var p = new Program();
try
{
await p.ServiceB(Guid.Parse(x.ToString()));
}
catch (MongoWriteException me)
{
await Task.Delay(5);
await p.ServiceB(Guid.Parse(x.ToString()));
}
}, _guid)
};
_tasks[0].Start();
_tasks[1].Start();
Task.WaitAll(_tasks);
}
我有这样的文档
public class SomeDocument
{
public Guid Id { get; set; }
public string PropertyA { get; set; }
public string PropertyB { get; set; }
}
现在我有两个不同的服务(A 和 B),可以适当地更新 PropertyA 和 PropertyB 并以异步方式工作。这意味着我不知道哪个服务将首先完成并且应该创建文档以及谁应该更新它。
因此,为了更新(或创建)文档,我目前在服务 A 中使用这样的代码
var filter = new FilterDefinitionBuilder<SomeDocument>().Where(r => r.Id == id);
var options = new FindOneAndUpdateOptions<SomeDocument, SomeDocument>() { IsUpsert = true };
var update = Builders<SomeDocument>.Update.Set(r => r.PropertyA, "Property A value");
await Database.GetCollection<SomeDocument>("someDocuments").FindOneAndUpdateAsync(filter, update, options);
以及来自服务 B 的下一个代码
var filter = new FilterDefinitionBuilder<SomeDocument>().Where(r => r.Id == id);
var options = new FindOneAndUpdateOptions<SomeDocument, SomeDocument>() { IsUpsert = true };
var update = Builders<SomeDocument>.Update.Set(r => r.PropertyB, "Property B value");
await Database.GetCollection<SomeDocument>("someDocuments").FindOneAndUpdateAsync(filter, update, options);
一切看起来都很好,但有时当两个服务同时工作时我会收到下一个错误
Unhandled Exception: MongoDB.Driver.MongoCommandException: Command findAndModify failed: E11000 duplicate key error collection: someDocuments index: _id_ dup key: { : BinData(3, B85ED193195A274DA94BC86B655B4509) }.
at MongoDB.Driver.Core.WireProtocol.CommandWireProtocol`1.ProcessReply(ConnectionId connectionId, ReplyMessage`1 reply)
at MongoDB.Driver.Core.WireProtocol.CommandWireProtocol`1.<ExecuteAsync>d__11.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at MongoDB.Driver.Core.Servers.Server.ServerChannel.<ExecuteProtocolAsync>d__26`1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at MongoDB.Driver.Core.Operations.CommandOperationBase`1.<ExecuteProtocolAsync>d__29.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at MongoDB.Driver.Core.Operations.WriteCommandOperation`1.<ExecuteAsync>d__2.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at MongoDB.Driver.Core.Operations.FindAndModifyOperationBase`1.<ExecuteAsync>d__19.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at MongoDB.Driver.OperationExecutor.<ExecuteWriteOperationAsync>d__3`1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at MongoDB.Driver.MongoCollectionImpl`1.<ExecuteWriteOperationAsync>d__62`1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.TaskAwaiter`1.GetResult()
at CVSP.MongoDbStore.MongoDbWriteModelFacade.<AddRecordField>d__6.MoveNext() in D:\Projects\Test\Source\MongoDbStore\WriteModel\MongoDbWriteModelFacade.cs:line 58
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.AsyncMethodBuilderCore.<>c.<ThrowAsync>b__6_1(Object state)
at System.Threading.QueueUserWorkItemCallback.WaitCallback_Context(Object state)
at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
at System.Threading.QueueUserWorkItemCallback.System.Threading.IThreadPoolWorkItem.ExecuteWorkItem()
at System.Threading.ThreadPoolWorkQueue.Dispatch()
at System.Threading._ThreadPoolWaitCallback.PerformWaitCallback()
在这种情况下我应该如何insert/update文件?
更新
扩展成功了
public static async Task<TProjection> FindOneAndUpdateWithConcurrencyAsync<TDocument, TProjection>(this IMongoCollection<TDocument> collection, FilterDefinition<TDocument> filter, UpdateDefinition<TDocument> update, FindOneAndUpdateOptions<TDocument, TProjection> options = null, CancellationToken cancellationToken = default(CancellationToken))
{
try
{
return await collection.FindOneAndUpdateAsync(filter, update, options, cancellationToken);
}
catch (MongoException ex)
{
Thread.Sleep(10);
return await collection.FindOneAndUpdateAsync(filter, update, options, cancellationToken);
}
}
使用 try/catch 起初龟头看起来很奇怪,我从一开始就不喜欢它,但在阅读后 https://docs.mongodb.com/manual/reference/method/db.collection.findAndModify/#upsert-and-unique-index 所有疑虑都消失了。
好吧,这是同步问题,不幸的是没有简单的解决方案。为了找到 hack,让我们剖析后端可能发生的事情。
假设我们有两个线程(服务)试图更新一个文档。
t1: 00:00:00.250 -> find document with Id (1)
t2: 00:00:00.255 -> find document with id (1)
t1: 00:00:00.260 -> No document found
t2: 00:00:00.262 -> No document found
t1: 00:00:00.300 -> Insert a document with Id(1)
t2: 00:00:00.300 -> Insert a document with Id(1)
宾果......我们有例外。两个线程都试图插入具有相同 ID 的文档。
没有我们可以在这里做什么?
让我们把这次短暂的交流变成我们的优势。捕获此异常并再次尝试调用 upsert。这次,它会成功找到文档并更新它。
我修改了 ServiceA
和 ServiceB
的代码如下,并尝试在紧密循环中插入 10000 个文档:
public async Task ServiceA(Guid id)
{
var filter = new FilterDefinitionBuilder<SomeDocument>().Where(r => r.Id == id);
var update = Builders<SomeDocument>.Update.Set(r => r.PropertyA, "Property A value");
var options = new UpdateOptions() { IsUpsert = true };
var database = _client.GetDatabase("Whosebug");
var collection = database.GetCollection<SomeDocument>(CollectionName,
new MongoCollectionSettings
{
WriteConcern = WriteConcern.W1
});
await collection.UpdateOneAsync(filter, update, options);
}
public async Task ServiceB(Guid id)
{
var filter = new FilterDefinitionBuilder<SomeDocument>().Where(r => r.Id == id);
var update = Builders<SomeDocument>.Update.Set(r => r.PropertyB, "Property B value");
var options = new UpdateOptions() { IsUpsert = true };
var database = _client.GetDatabase("Whosebug");
var collection = database.GetCollection<SomeDocument>(CollectionName,
new MongoCollectionSettings
{
WriteConcern = WriteConcern.W1
});
await collection.UpdateOneAsync(filter, update, options);
}
这是我的启动代码。不完美但有用。
for (var i = 0; i < 10000; i++)
{
var _guid = Guid.NewGuid();
var _tasks = new[]
{
new Task(async (x) =>
{
var p = new Program();
try
{
await p.ServiceA(Guid.Parse(x.ToString()));
}
catch (MongoWriteException me)
{
await Task.Delay(5);
await p.ServiceA(Guid.Parse(x.ToString()));
}
}, _guid),
new Task(async (x) =>
{
var p = new Program();
try
{
await p.ServiceB(Guid.Parse(x.ToString()));
}
catch (MongoWriteException me)
{
await Task.Delay(5);
await p.ServiceB(Guid.Parse(x.ToString()));
}
}, _guid)
};
_tasks[0].Start();
_tasks[1].Start();
Task.WaitAll(_tasks);
}