Azure 服务总线主题架构

Azure Service Bus Topic Architecture

我一直在完成 Microsoft 提供的 eShopOnContainers 项目,总体上磨练我的微服务技能。其中一个重要概念是引入事件总线。我选择尝试使用 Azure 服务总线,但我对该平台的体验有限。

在手动创建主题、订阅等之后,我已经设法获得项目 运行,但这引发了一些问题:

订阅应用程序是否有责任在 Azure 中创建自己的订阅?例如在启动时?

从概念上讲,主题代表不同的事件堆栈,对吗?例如。客户、订单等?或者它们打算成为领域事件边界?例如。在此应用程序中,'eShop' 将是主题。

Azure 部署是另一个主题,但与服务总线配置相关,是否有任何推荐的技术在源代码管理中进行管理?

非常感谢任何见解。

Is it not the responsibility of the subscribing application to create it's own Subscription in Azure? e.g. on startup?

没错。详细信息取决于您使用的基础消息服务。在 Azure 服务总线的情况下,每个服务在启动时都会订阅它感兴趣的事件。例如,Orderingsubscribe during startup to the events it handles. The project has a IEventBusSubscriptionsManager contract to be implemented specifically for each messaging service. For Azure Service Bus implementation 每个服务都有一个物理订阅,它感兴趣的每个事件都由一个规则,按服务总线消息 Label 的值过滤消息(Label 包含事件名称)。

Conceptually, Topics represent different event stacks, correct? E.g. Customers, Ordering, etc? Or are they intended to be domain event boundaries? E.g. in this application, 'eShop' would be the topic

话题是发帖的要点。您可以为每个服务使用一个主题,但这意味着订阅者需要知道是什么服务发布了这些事件。或者,可能更好的选择是让一个主题 为您的所有服务所知,并将事件发布到该主题。现在称它为“Events”。对各种事件感兴趣的每个服务都会创建一个订阅。订阅将能够获得发布到 Events 主题的 任何 消息(事件),但实际上应该只 "catch" 并传递它需要的事件(阅读 "subscribed to").这就是过滤的用武之地。通过创建过滤器 (RuleDescriptions),每个服务的给定订阅在代理上声明它将接收什么消息。

Azure deployments is a whole other topic, but related to the Service Bus configuration, are there any recommended techniques for managing that within source control?

几个选项。

  1. 在 运行 时基于代码创建实体(主题、带规则的订阅、队列)。
  2. 像版本控制系统中的代码一样,使用 ARM 模板和版本捕获拓扑。
  3. 使用 Azure CLI 和版本控制您的脚本。

根据反馈,我能够基于 eShop Event Bus 拼凑出一个粗略的解决方案,以在启动时创建 Azure 订阅:

 public EventBusServiceBus(IServiceBusPersisterConnection serviceBusPersisterConnection,
        ILogger<EventBusServiceBus> logger, IEventBusSubscriptionsManager subsManager, string subscriptionClientName,
        ILifetimeScope autofac, AzureUserCredentials userCredentials, string subscriptionId, string resourceGroupName, string serviceBusName, string topicName)
    {
        _serviceBusPersisterConnection = serviceBusPersisterConnection;
        _logger = logger;
        _subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager();

        _subscriptionClient = new Microsoft.Azure.ServiceBus.SubscriptionClient(serviceBusPersisterConnection.ServiceBusConnectionStringBuilder,
            subscriptionClientName);
        _autofac = autofac;

        var credentials = SdkContext.AzureCredentialsFactory.FromServicePrincipal(
          userCredentials.ClientId, userCredentials.ClientSecret, userCredentials.TenantId, AzureEnvironment.FromName(userCredentials.EnvironmentName));

        var azure = Azure
                .Configure()
                .WithLogLevel(HttpLoggingDelegatingHandler.Level.Basic)
                .Authenticate(credentials)
                .WithSubscription(subscriptionId);

        var nm = azure.ServiceBusNamespaces.GetByResourceGroup(resourceGroupName, serviceBusName);

        var topic = nm.Topics.GetByName(topicName);

        if (topic == null)
            throw new ArgumentException($"Topic {topic} does not exist.", nameof(topic));

        Microsoft.Azure.Management.ServiceBus.Fluent.ISubscription subscription = null;
        try
        { subscription = topic.Subscriptions.GetByName(subscriptionClientName); }
        catch { }

        if (subscription == null)
        {
            logger.LogInformation($"Creating Azure Subscription '{subscriptionClientName}'");
            topic.Subscriptions.Define(subscriptionClientName).WithDeleteOnIdleDurationInMinutes(5).Create();
        }
        else
        {
            logger.LogInformation($"Azure Subscription '{subscriptionClientName}' already exists. Reusing.");
        }

        RemoveDefaultRule();
        RegisterSubscriptionClientMessageHandler();
    }