UnitOfWork 和 DbContext:DI 的线程安全
UnitOfWork and DbContext: thread safety with DI
我正在开发托管 IHostedService
:
的 .NET Core 2.2 控制台应用程序
public class MqttClientHostedService : IHostedService, IDisposable
{
[...]
public MqttClientHostedService(
ILogger<MqttClientHostedService> logger,
IOptions<MqttClientConfiguration> mqttConfiguration,
IPositionService positionService)
{
this.logger = logger;
this.config = mqttConfiguration;
this.positionService = positionService;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
mqttClient = new MqttFactory().CreateMqttClient();
mqttClient.Connected += async (s, e) => await MqttClient_Connected(s, e);
mqttClient.ApplicationMessageReceived +=
async (s, e) => await MqttClient_ApplicationMessageReceived(s, e);
await mqttClient.ConnectAsync(
new MqttClientOptionsBuilder()
.WithTcpServer(config.Value.Host, config.Value.Port).Build());
}
private async Task MqttClient_ApplicationMessageReceived(
object sender, MqttApplicationMessageReceivedEventArgs e)
{
string message = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
await positionService.HandleMessage(message);
}
[...]
}
这个 IPositionService
是一个管理器,它检查消息并检查它是否可以保存在我们的数据库中:
public class PositionService : IPositionService
{
[...]
public PositionService(
IUnitOfWork unitOfWork, ILogger<PositionService> logger)
{
this.unitOfWork = unitOfWork;
this.logger = logger;
}
public async Task HandleMessage(string message)
{
Entity entity = await unitOfWork.EntityRepository.GetByMessage(message);
[...]
await unitOfWork.EntityRepository.UpdateAsync(entity);
await unitOfWork.Save();
}
[...]
}
IUnitOfWork
是 Entity Framework 核心 DbContext
的包装(请不要评判我,我有理由这样做):
public class UnitOfWork : IUnitOfWork
{
[...]
public UnitOfWork(MyContext myContext)
{
this.myContext = myContext;
EntityRepository = new EFRepository<Entity>(myContext);
}
public async Task Save()
{
await myContext.SaveChangesAsync();
}
}
EFRepository<T>
,它实现了 IRepository<T>
接口,是 DbSet<T>
的包装器(再次强调,请不要评判我)。这里没有相关代码。
控制台应用程序的 Program.cs 配置如下:
[...]
.ConfigureServices((hostContext, services) =>
{
services.AddDbContext<MyContext>(
c => c.UseSqlServer("[...]", options => options.UseNetTopologySuite()),
ServiceLifetime.Transient);
services.AddTransient<IPositionService, PositionService>();
services.AddTransient(typeof(IRepository<>), typeof(EFRepository<>));
services.AddTransient<IUnitOfWork, UnitOfWork>();
services.AddHostedService<MqttClientHostedService>();
[...]
});
问题是 PositionService.HandleMessage
每秒被调用多次,并且 DbContext
不是线程安全的,我收到此错误消息:
A second operation started on this context before a previous operation
completed.
我通过从 PositionService
的依赖项中删除 IUnitOfWork
解决了这个问题,而不是注入 IServiceScopeFactory
,然后执行:
using (IServiceScope serviceScope = serviceScopeFactory.CreateScope())
{
IUnitOfWork unitOfWork = serviceScope.ServiceProvider.GetService<IUnitOfWork>();
[...]
}
这种方式可行,但我不喜欢。这似乎是个把戏,我不喜欢我的 PositionService
知道 Dependency Injection
并且必须处理范围的事实。
我的问题是:有没有更好的方法来解决这个问题而不用碰我的类?我应该让整个 UnitOfWork
线程安全吗?或者不使用 DI 手动创建它?
问题的根源在于 MyContext
在以下对象图中作为俘虏依赖项被俘虏:
MqttClientHostedService
-> PositionService
-> UnitOfWork
-> MyContext
此图表中的所有类型都注册为 Transient
,但充当 托管服务 的服务(例如您的 MqttClientHostedService
)仍然仅被解析在应用程序的持续时间内进行一次并无限期缓存。这有效地使他们成为单身人士。
换句话说,MyContext
意外地被单个 MqttClientHostedService
保持活动状态,并且因为多个消息可以并行传入,所以您自己就有了竞争条件。
解决方案是让每个 ApplicationMessageReceived
事件 运行 在其自己独特的小气泡(作用域)中并从该气泡中解析一个新的 IPositionService
。例如:
public class MqttClientHostedService : IHostedService, IDisposable
{
[...]
public MqttClientHostedService(
ILogger<MqttClientHostedService> logger,
IOptions<MqttClientConfiguration> mqttConfiguration,
IServiceProvider provider)
{
this.logger = logger;
this.config = mqttConfiguration;
this.provider = provider;
}
[...]
private async Task MqttClient_ApplicationMessageReceived(
object sender, MqttApplicationMessageReceivedEventArgs e)
{
using (var scope = provider.CreateScope())
{
positionService = scope.ServiceProvider
.GetRequiredService<IPositionService>();
string message = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
await positionService.HandleMessage(message);
}
}
[...]
}
我正在开发托管 IHostedService
:
public class MqttClientHostedService : IHostedService, IDisposable
{
[...]
public MqttClientHostedService(
ILogger<MqttClientHostedService> logger,
IOptions<MqttClientConfiguration> mqttConfiguration,
IPositionService positionService)
{
this.logger = logger;
this.config = mqttConfiguration;
this.positionService = positionService;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
mqttClient = new MqttFactory().CreateMqttClient();
mqttClient.Connected += async (s, e) => await MqttClient_Connected(s, e);
mqttClient.ApplicationMessageReceived +=
async (s, e) => await MqttClient_ApplicationMessageReceived(s, e);
await mqttClient.ConnectAsync(
new MqttClientOptionsBuilder()
.WithTcpServer(config.Value.Host, config.Value.Port).Build());
}
private async Task MqttClient_ApplicationMessageReceived(
object sender, MqttApplicationMessageReceivedEventArgs e)
{
string message = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
await positionService.HandleMessage(message);
}
[...]
}
这个 IPositionService
是一个管理器,它检查消息并检查它是否可以保存在我们的数据库中:
public class PositionService : IPositionService
{
[...]
public PositionService(
IUnitOfWork unitOfWork, ILogger<PositionService> logger)
{
this.unitOfWork = unitOfWork;
this.logger = logger;
}
public async Task HandleMessage(string message)
{
Entity entity = await unitOfWork.EntityRepository.GetByMessage(message);
[...]
await unitOfWork.EntityRepository.UpdateAsync(entity);
await unitOfWork.Save();
}
[...]
}
IUnitOfWork
是 Entity Framework 核心 DbContext
的包装(请不要评判我,我有理由这样做):
public class UnitOfWork : IUnitOfWork
{
[...]
public UnitOfWork(MyContext myContext)
{
this.myContext = myContext;
EntityRepository = new EFRepository<Entity>(myContext);
}
public async Task Save()
{
await myContext.SaveChangesAsync();
}
}
EFRepository<T>
,它实现了 IRepository<T>
接口,是 DbSet<T>
的包装器(再次强调,请不要评判我)。这里没有相关代码。
控制台应用程序的 Program.cs 配置如下:
[...]
.ConfigureServices((hostContext, services) =>
{
services.AddDbContext<MyContext>(
c => c.UseSqlServer("[...]", options => options.UseNetTopologySuite()),
ServiceLifetime.Transient);
services.AddTransient<IPositionService, PositionService>();
services.AddTransient(typeof(IRepository<>), typeof(EFRepository<>));
services.AddTransient<IUnitOfWork, UnitOfWork>();
services.AddHostedService<MqttClientHostedService>();
[...]
});
问题是 PositionService.HandleMessage
每秒被调用多次,并且 DbContext
不是线程安全的,我收到此错误消息:
A second operation started on this context before a previous operation completed.
我通过从 PositionService
的依赖项中删除 IUnitOfWork
解决了这个问题,而不是注入 IServiceScopeFactory
,然后执行:
using (IServiceScope serviceScope = serviceScopeFactory.CreateScope())
{
IUnitOfWork unitOfWork = serviceScope.ServiceProvider.GetService<IUnitOfWork>();
[...]
}
这种方式可行,但我不喜欢。这似乎是个把戏,我不喜欢我的 PositionService
知道 Dependency Injection
并且必须处理范围的事实。
我的问题是:有没有更好的方法来解决这个问题而不用碰我的类?我应该让整个 UnitOfWork
线程安全吗?或者不使用 DI 手动创建它?
问题的根源在于 MyContext
在以下对象图中作为俘虏依赖项被俘虏:
MqttClientHostedService
-> PositionService
-> UnitOfWork
-> MyContext
此图表中的所有类型都注册为 Transient
,但充当 托管服务 的服务(例如您的 MqttClientHostedService
)仍然仅被解析在应用程序的持续时间内进行一次并无限期缓存。这有效地使他们成为单身人士。
换句话说,MyContext
意外地被单个 MqttClientHostedService
保持活动状态,并且因为多个消息可以并行传入,所以您自己就有了竞争条件。
解决方案是让每个 ApplicationMessageReceived
事件 运行 在其自己独特的小气泡(作用域)中并从该气泡中解析一个新的 IPositionService
。例如:
public class MqttClientHostedService : IHostedService, IDisposable
{
[...]
public MqttClientHostedService(
ILogger<MqttClientHostedService> logger,
IOptions<MqttClientConfiguration> mqttConfiguration,
IServiceProvider provider)
{
this.logger = logger;
this.config = mqttConfiguration;
this.provider = provider;
}
[...]
private async Task MqttClient_ApplicationMessageReceived(
object sender, MqttApplicationMessageReceivedEventArgs e)
{
using (var scope = provider.CreateScope())
{
positionService = scope.ServiceProvider
.GetRequiredService<IPositionService>();
string message = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
await positionService.HandleMessage(message);
}
}
[...]
}