从数据库连接的 RabbitMq 消息中获取租户 ID

Get Tenant Id from RabbitMq Message for Db Connection

我有一个微服务架构,其中 ASP.Net 核心应用程序和 RabbitMq 作为微服务之间的事件总线。
我也想支持多租户。
因此,我在 Startup.cs 中定义了以下依赖项注入服务,以根据用户的租户 ID 在每个请求上打开与数据库的连接。

services.AddScoped<IDocumentSession>(ds =>
            {
                var store = ds.GetRequiredService<IDocumentStore>();
                var httpContextAccessor = ds.GetRequiredService<IHttpContextAccessor>();
                var tenant = httpContextAccessor?.HttpContext?.User?.Claims.FirstOrDefault(c => c.Type == "tid")?.Value;
                return tenant != null ? store.OpenSession(tenant) : store.OpenSession();
            });

问题出在服务处理事件总线消息(如 UserUpdatedEvent)时。
在那种情况下,当它尝试打开 Db 连接时,它显然没有来自 http 上下文的用户信息。

如何在使用 RabbitMq 注入作用域服务和处理事件时 send/access 各个用户的租户 ID?

或改述我的问题: 执行依赖注入代码时,有什么方法可以访问 RabbitMQ 消息(例如它的 headers)?

你不能

或者可能,但如果您的设计取决于 HTTP 上下文,则不会。正如 .NET documentation on service lifetime 所述:

Scoped lifetime services are created once per client request (connection).

因此,从您的 (HTTP) 服务的角度来看,该请求是一个入口点,它使用容器魔法,通过全局 HTTP 上下文,根据请求设置您的数据库,在您的任何业务逻辑之前。这似乎不是最佳设计选择,尤其是如果您计划在 HTTP 请求之外使用相同的逻辑。

相比之下,你的消息消费服务是long-running;在这个生命周期中,如果您的连接设置需要来自每条消息(租户 ID)的信息,您不能仅仅依赖依赖注入。

“正确”的方法是不依赖 HTTP 上下文中的全局状态来建立数据库连接。改为设置适用于所有租户的数据库上下文。

由于没有HttpContext,因为RabbitMq 请求不是Http 请求,正如@istepaniuk 的回答中所指出的,我创建了自己的上下文并将其命名为AmqpContext:

public interface IAmqpContext
    {
        void ClearHeaders();
        void AddHeaders(IDictionary<string, object> headers);
        string GetHeaderByKey(string headerKey);
    }

    public class AmqpContext : IAmqpContext
    {
        private readonly Dictionary<string, object> _headers;

        public AmqpContext()
        {
            _headers = new Dictionary<string, object>();
        }

        public void ClearHeaders()
        {
            _headers.Clear();
        }

        public void AddHeaders(IDictionary<string, object> headers)
        {
            foreach (var header in headers)
                _headers.Add(header.Key, header.Value);
        }

        public string GetHeaderByKey(string headerKey) 
        {
            if (_headers.TryGetValue(headerKey, out object headerValue))
            {
                return Encoding.Default.GetString((byte[])headerValue);
            }
            return null;
        }
    }

并且在发送 RabbitMq 消息时,我通过 headers 发送租户 ID,如下所示:

                    var properties = channel.CreateBasicProperties();
                    if (tenantId != null)
                    {
                        var headers = new Dictionary<string, object>
                        {
                            { "tid", tenantId }
                        };
                        properties.Headers = headers;
                    }

                    channel.BasicPublish(exchange: BROKER_NAME,
                                     routingKey: eventName,
                                     mandatory: true,
                                     basicProperties: properties,
                                     body: body);

然后在接收服务时,我将 AmqpContext 注册为 Startup.cs 中的范围服务:

services.AddScoped<IAmqpContext, AmqpContext>();

当接收到 RabbitMq 消息时,在消费者通道中,一个范围和 Amqp 上下文被创建:

consumer.Received += async (model, ea) =>
            {
                var eventName = ea.RoutingKey;
                var message = Encoding.UTF8.GetString(ea.Body);
                var properties = ea.BasicProperties;

                using (var scope = _serviceProvider.CreateScope())
                        {
                            var amqpContext = scope.ServiceProvider.GetService<IAmqpContext>();
                            if (amqpContext != null)
                            {
                                amqpContext.ClearHeaders();
                                if (properties.Headers != null && amqpContext != null)
                                {
                                    amqpContext.AddHeaders(properties.Headers);
                                }
                            }
                            var handler = scope.ServiceProvider.GetService(subscription.HandlerType);
                            if (handler == null) continue;
                            var eventType = _subsManager.GetEventTypeByName(eventName);
                            var integrationEvent = JsonConvert.DeserializeObject(message, eventType);
                            var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
                            await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });
                        }

                channel.BasicAck(ea.DeliveryTag, multiple: false);
            };

然后当创建范围内的 Db 连接服务时(请参阅我的问题)我可以从消息 headers:

访问租户 ID
    services.AddScoped<IDocumentSession>(ds =>
    {
        var store = ds.GetRequiredService<IDocumentStore>();
        string tenant = null;
        var httpContextAccessor = ds.GetRequiredService<IHttpContextAccessor>();
        if (httpContextAccessor.HttpContext != null)
        {
            tenant = httpContextAccessor.HttpContext.User?.Claims.FirstOrDefault(c => c.Type == "tid")?.Value;
        }
        else
        {
            var amqpContext = ds.GetRequiredService<IAmqpContext>();
            tenant = amqpContext.GetHeaderByKey("tid");
        }
        return tenant != null ? store.OpenSession(tenant) : store.OpenSession();
    });