使用 .NET Core SDK 创建过滤的服务总线订阅
Create filtered Service Bus subscription using .NET Core SDKs
我想在 .NET Core 2.1 应用程序中使用 Azure 服务总线。我熟悉 Nuget 包 Microsoft.Azure.ServiceBus
附带的 SDK,我目前正在使用它编写主题并从那里接收消息。
我现在想要的是为每个客户创建关于该主题的订阅。重点是我想使用 SqlFilter
创建过滤订阅。这意味着,我必须通过代码创建订阅。
顺便说一句:据我所知,在代码中执行此操作的唯一其他方法是使用 ARM 模板进行部署,因为门户不允许我创建过滤订阅。
我明白了,Microsoft.Azure.ServiceBus
无法创建资源,所以我去了 Nuget 包 Microsoft.Azure.Management.ServiceBus
。所以我现在可以像这样从代码创建一个新的订阅:
private static async Task CreateSubscriptionAsync()
{
var context = new AuthenticationContext($"https://login.microsoftonline.com/{Configuration["AppSettings:AzureManagement:TenantId"]}");
var token = await context.AcquireTokenAsync(
"https://management.core.windows.net/",
new ClientCredential(Configuration["AppSettings:AzureManagement:ClientId"], Configuration["AppSettings:AzureManagement:ClientSecret"]));
var creds = new TokenCredentials(token.AccessToken);
using (var sbClient = new ServiceBusManagementClient(creds)
{
SubscriptionId = VariableHelper.Configuration["AppSettings:AzureManagement:AzureSubscriptionId"]
})
{
var queueParams = new SBSubscription
{
LockDuration = TimeSpan.FromSeconds(10)
};
await sbClient.Subscriptions.CreateOrUpdateAsync(
Configuration["AppSettings:ServiceBus:ResourceGroup"],
Configuration["AppSettings:ServiceBus:Namespace"],
Configuration["AppSettings:ServiceBus:TopicName"],
"mysub",
queueParams);
}
}
这行得通,所以我有一个新的订阅,我也可以将其删除。但是现在我可以在哪里定义过滤器? SBSubscription
类型不包含定义过滤器的选项,并且 sbClient.Subscriptions.CreateOrUpdateAsync
.
中没有方法重载
我发现在代码中执行此操作的唯一方法是基于旧的 Nuget WindowsAzure.ServiceBus
,它不适用于 .NET Core。所以我会像这样使用 NamespaceManager
:
var manager = NamespaceManager.CreateFromConnectionString("MyConnectionString");
var rule = new RuleDescription("property > 4");
var sub = manager.CreateSubscriptionAsync(new SubscriptionDescription("mytopic", "mysub"), rule);
如果没有这个功能,完整的主题对我来说似乎毫无用处,我不敢相信这意味着 Azure 服务总线还没有为 .NET Core 做好准备。
编辑:Arunprabhu 建议的解决方案
我只是想完整地展示解决方案。
- 添加对 NuGet 的引用
Microsoft.Azure.Management.ServiceBus
。
- 添加对 NuGet 的引用
Microsoft.Azure.ServiceBus
。
- 按如下方式创建子:
private static async Task CreateSubscriptionAsync()
{
// this is to show that it works with any subscription-name
var subName = Guid.NewGuid().ToString("N");
// create the subscription using Azure Management API
var context = new AuthenticationContext($"https://login.microsoftonline.com/{Configuration["AppSettings:AzureManagement:TenantId"]}");
var token = await context.AcquireTokenAsync(
"https://management.core.windows.net/",
new ClientCredential(Configuration["AppSettings:AzureManagement:ClientId"], Configuration["AppSettings:AzureManagement:ClientSecret"]));
var creds = new TokenCredentials(token.AccessToken);
using (var sbClient = new ServiceBusManagementClient(creds)
{
SubscriptionId = VariableHelper.Configuration["AppSettings:AzureManagement:AzureSubscriptionId"]
})
{
var queueParams = new SBSubscription
{
LockDuration = TimeSpan.FromSeconds(10)
};
await sbClient.Subscriptions.CreateOrUpdateAsync(
Configuration["AppSettings:ServiceBus:ResourceGroup"],
Configuration["AppSettings:ServiceBus:Namespace"],
Configuration["AppSettings:ServiceBus:TopicName"],
subName,
queueParams);
}
// add filter-rule using Azure ServiceBus API
var client = new SubscriptionClient(ServiceBusConnectionString, Configuration["AppSettings:ServiceBus:TopicName"], subName);
await client.AddRuleAsync("default", new SqlFilter("1=1"));
// That's it
}
在Microsoft.Azure.ServiceBus的SubscriptionClient下也有类似的规则管理方法。查看 here 样本。
I understood, that Microsoft.Azure.ServiceBus is not able to create ressources
从 Microsoft.Azure.ServiceBus 的 3.1.0 预览版开始,您可以使用 ManagementClient
创建实体,而无需 Microsoft.Azure.Management.ServiceBus 库。它有一个 CreateSubscriptionAsync()
的重载,将 RuleDescription
创建为默认规则。或者,不提供 RuleDescription
将自动为您设置默认规则。
解释 Sean Feldman 的回答,因为他的示例 link 现在已损坏,您可以这样做:
public static async Task CreateSubscription(
string connection,
string topicPath,
string subscriptionName,
string matchExpression,
string ruleDescription,
bool checkForExisting = false,
CancellationToken cancellationToken = default(CancellationToken)
)
{
if (checkForExisting)
{
var exists = (await new ManagementClient(connection)
.GetSubscriptionsAsync(topicPath, cancellationToken: cancellationToken))
.Any(sub => sub.SubscriptionName == subscriptionName);
if (exists) return;
}
var subscriptionDescription = new SubscriptionDescription(topicPath, subscriptionName);
var rule = new RuleDescription(ruleDescription, new SqlFilter(matchExpression));
await new ManagementClient(connection).CreateSubscriptionAsync(subscriptionDescription, rule, cancellationToken);
}
为简单起见,我删除了强制性输入验证,例如空字符串或无效字符。我还提供了一种检查订阅是否已存在的方法。理想情况下,您只执行一次该检查,然后在内存中保留一个 运行 选项卡,这样您就不会每次都执行该 API 调用。
对于您的 matchExpression
,您将根据发布消息时添加到消息中的用户属性,使用 SQLish 语法为该订阅定义过滤器。
实现看起来像:
await CreateSubscription(
Configuration.GetConnectionString("MyServiceBusConnectionString"),
"myTopicName",
"mySubscription",
"MyUserProperty='MyValue'",
"MyRule",
true
)
您可以使用Microsoft.Azure.Management.ServiceBus
包创建过滤器,它们是Rules的一部分。
await client.Rules.CreateOrUpdateAsync(resourceGroupName, namespaceName,
topicName, subscriptionName, filterName, new Rule {
SqlFilter = new SqlFilter(sql)
});
您在下面来自 Azure 门户的屏幕截图中看到过滤器名称和 SQL 字符串:
我想在 .NET Core 2.1 应用程序中使用 Azure 服务总线。我熟悉 Nuget 包 Microsoft.Azure.ServiceBus
附带的 SDK,我目前正在使用它编写主题并从那里接收消息。
我现在想要的是为每个客户创建关于该主题的订阅。重点是我想使用 SqlFilter
创建过滤订阅。这意味着,我必须通过代码创建订阅。
顺便说一句:据我所知,在代码中执行此操作的唯一其他方法是使用 ARM 模板进行部署,因为门户不允许我创建过滤订阅。
我明白了,Microsoft.Azure.ServiceBus
无法创建资源,所以我去了 Nuget 包 Microsoft.Azure.Management.ServiceBus
。所以我现在可以像这样从代码创建一个新的订阅:
private static async Task CreateSubscriptionAsync()
{
var context = new AuthenticationContext($"https://login.microsoftonline.com/{Configuration["AppSettings:AzureManagement:TenantId"]}");
var token = await context.AcquireTokenAsync(
"https://management.core.windows.net/",
new ClientCredential(Configuration["AppSettings:AzureManagement:ClientId"], Configuration["AppSettings:AzureManagement:ClientSecret"]));
var creds = new TokenCredentials(token.AccessToken);
using (var sbClient = new ServiceBusManagementClient(creds)
{
SubscriptionId = VariableHelper.Configuration["AppSettings:AzureManagement:AzureSubscriptionId"]
})
{
var queueParams = new SBSubscription
{
LockDuration = TimeSpan.FromSeconds(10)
};
await sbClient.Subscriptions.CreateOrUpdateAsync(
Configuration["AppSettings:ServiceBus:ResourceGroup"],
Configuration["AppSettings:ServiceBus:Namespace"],
Configuration["AppSettings:ServiceBus:TopicName"],
"mysub",
queueParams);
}
}
这行得通,所以我有一个新的订阅,我也可以将其删除。但是现在我可以在哪里定义过滤器? SBSubscription
类型不包含定义过滤器的选项,并且 sbClient.Subscriptions.CreateOrUpdateAsync
.
我发现在代码中执行此操作的唯一方法是基于旧的 Nuget WindowsAzure.ServiceBus
,它不适用于 .NET Core。所以我会像这样使用 NamespaceManager
:
var manager = NamespaceManager.CreateFromConnectionString("MyConnectionString");
var rule = new RuleDescription("property > 4");
var sub = manager.CreateSubscriptionAsync(new SubscriptionDescription("mytopic", "mysub"), rule);
如果没有这个功能,完整的主题对我来说似乎毫无用处,我不敢相信这意味着 Azure 服务总线还没有为 .NET Core 做好准备。
编辑:Arunprabhu 建议的解决方案
我只是想完整地展示解决方案。
- 添加对 NuGet 的引用
Microsoft.Azure.Management.ServiceBus
。 - 添加对 NuGet 的引用
Microsoft.Azure.ServiceBus
。 - 按如下方式创建子:
private static async Task CreateSubscriptionAsync()
{
// this is to show that it works with any subscription-name
var subName = Guid.NewGuid().ToString("N");
// create the subscription using Azure Management API
var context = new AuthenticationContext($"https://login.microsoftonline.com/{Configuration["AppSettings:AzureManagement:TenantId"]}");
var token = await context.AcquireTokenAsync(
"https://management.core.windows.net/",
new ClientCredential(Configuration["AppSettings:AzureManagement:ClientId"], Configuration["AppSettings:AzureManagement:ClientSecret"]));
var creds = new TokenCredentials(token.AccessToken);
using (var sbClient = new ServiceBusManagementClient(creds)
{
SubscriptionId = VariableHelper.Configuration["AppSettings:AzureManagement:AzureSubscriptionId"]
})
{
var queueParams = new SBSubscription
{
LockDuration = TimeSpan.FromSeconds(10)
};
await sbClient.Subscriptions.CreateOrUpdateAsync(
Configuration["AppSettings:ServiceBus:ResourceGroup"],
Configuration["AppSettings:ServiceBus:Namespace"],
Configuration["AppSettings:ServiceBus:TopicName"],
subName,
queueParams);
}
// add filter-rule using Azure ServiceBus API
var client = new SubscriptionClient(ServiceBusConnectionString, Configuration["AppSettings:ServiceBus:TopicName"], subName);
await client.AddRuleAsync("default", new SqlFilter("1=1"));
// That's it
}
在Microsoft.Azure.ServiceBus的SubscriptionClient下也有类似的规则管理方法。查看 here 样本。
I understood, that Microsoft.Azure.ServiceBus is not able to create ressources
从 Microsoft.Azure.ServiceBus 的 3.1.0 预览版开始,您可以使用 ManagementClient
创建实体,而无需 Microsoft.Azure.Management.ServiceBus 库。它有一个 CreateSubscriptionAsync()
的重载,将 RuleDescription
创建为默认规则。或者,不提供 RuleDescription
将自动为您设置默认规则。
解释 Sean Feldman 的回答,因为他的示例 link 现在已损坏,您可以这样做:
public static async Task CreateSubscription(
string connection,
string topicPath,
string subscriptionName,
string matchExpression,
string ruleDescription,
bool checkForExisting = false,
CancellationToken cancellationToken = default(CancellationToken)
)
{
if (checkForExisting)
{
var exists = (await new ManagementClient(connection)
.GetSubscriptionsAsync(topicPath, cancellationToken: cancellationToken))
.Any(sub => sub.SubscriptionName == subscriptionName);
if (exists) return;
}
var subscriptionDescription = new SubscriptionDescription(topicPath, subscriptionName);
var rule = new RuleDescription(ruleDescription, new SqlFilter(matchExpression));
await new ManagementClient(connection).CreateSubscriptionAsync(subscriptionDescription, rule, cancellationToken);
}
为简单起见,我删除了强制性输入验证,例如空字符串或无效字符。我还提供了一种检查订阅是否已存在的方法。理想情况下,您只执行一次该检查,然后在内存中保留一个 运行 选项卡,这样您就不会每次都执行该 API 调用。
对于您的 matchExpression
,您将根据发布消息时添加到消息中的用户属性,使用 SQLish 语法为该订阅定义过滤器。
实现看起来像:
await CreateSubscription(
Configuration.GetConnectionString("MyServiceBusConnectionString"),
"myTopicName",
"mySubscription",
"MyUserProperty='MyValue'",
"MyRule",
true
)
您可以使用Microsoft.Azure.Management.ServiceBus
包创建过滤器,它们是Rules的一部分。
await client.Rules.CreateOrUpdateAsync(resourceGroupName, namespaceName,
topicName, subscriptionName, filterName, new Rule {
SqlFilter = new SqlFilter(sql)
});
您在下面来自 Azure 门户的屏幕截图中看到过滤器名称和 SQL 字符串: