Cloudformation - 如何在代码中设置 SNS 订阅的过滤策略?
Cloudformation - how to set filter policy of SNS subscription in code?
更新:Cloudformation 现在支持 SNS 主题过滤器,所以这个问题不再相关,不需要自定义插件或代码。
我正在构建一个包含多个 SNS 主题和多个 Lambda 的系统,每个 Lambda 都从其分配的 SQS 队列中读取消息。 SQS 队列订阅了 SNS 主题,但也有一个过滤策略,因此消息将最终出现在相关的 SQS 队列中。
当我在 AWS 控制台中设置订阅时,它运行良好。
现在我正尝试在我的代码中执行相同的操作,但 AWS Cloudformation 文档没有描述如何向订阅添加过滤策略。基于 python examples here,我尝试了以下方法:
StopOperationSubscription:
Type: "AWS::SNS::Subscription"
Properties:
Protocol: sqs
TopicArn:
Ref: StatusTopic
Endpoint:
Fn::GetAtt: [StopActionQueue, Arn]
FilterPolicy: '{"value": ["stop"]}'
但是我得到这个错误:
An error occurred: StopOperationSubscription - Encountered unsupported property FilterPolicy.
如何使用 CloudFormation 设置我需要的过滤策略?如果不支持,您有什么替代建议?
我希望它在我部署无服务器应用程序时自动设置,无需手动步骤。
我是这样修复的:
serverless.yml
plugins:
- serverless-plugin-scripts
custom:
scripts:
commands:
update-topic-filters: sls invoke local -f configureSubscriptions --path resources/lambdaTopicFilters.json
hooks:
before:deploy:finalize: sls update-topic-filters
functions:
configureSubscriptions:
handler: src/configurationLambdas/configureSubscriptions.main
# Only invoked when deploying - therefore, no permissions or triggers are needed.
configureSubscriptions.js
import AWS from 'aws-sdk'
const nameFromArn = arn => arn.split(':').pop()
const lambdaNameFromArn = arn =>
nameFromArn(arn)
.split('-')
.pop()
exports.main = async event => {
const sns = new AWS.SNS({ apiVersion: '2010-03-31' })
const params = {}
const { Topics } = await sns.listTopics(params).promise()
for (const { TopicArn } of Topics) {
const topicName = nameFromArn(TopicArn)
const filtersForTopic = event[topicName]
if (!filtersForTopic) {
continue
}
const { Subscriptions } = await sns.listSubscriptionsByTopic({ TopicArn }).promise()
for (const { Protocol, Endpoint, SubscriptionArn } of Subscriptions) {
if (Protocol === 'lambda') {
const lambdaName = lambdaNameFromArn(Endpoint)
const filterForLambda = filtersForTopic[lambdaName]
if (!filterForLambda) {
continue
}
const setPolicyParams = {
AttributeName: 'FilterPolicy',
SubscriptionArn,
AttributeValue: JSON.stringify(filterForLambda),
}
await sns.setSubscriptionAttributes(setPolicyParams).promise()
// eslint-disable-next-line no-console
console.log('Subscription filters has been set')
}
}
}
}
顶级是不同的主题名称,下一级是lambda名称,第三级是相关订阅的过滤策略:
lambdaTopicFilters.json
{
"user-event": {
"activateUser": {
"valueType": ["status"],
"value": ["awaiting_activation"]
},
"findActivities": {
"messageType": ["event"],
"value": ["awaiting_activity_data"],
"valueType": ["status"]
}
},
"system-event": {
"startStopProcess": {
"valueType": ["status"],
"value": ["activated", "aborted", "limit_reached"]
}
}
}
Cloudformation 昨天才开始支持 FilterPolicy。我也纠结了一段时间:)
语法
JSON
{
"Type" : "AWS::SNS::Subscription",
"Properties" : {
"DeliveryPolicy" : JSON object,
"Endpoint" : String,
"FilterPolicy" : JSON object,
"Protocol" : String,
"RawMessageDelivery" : Boolean,
"Region" : String,
"TopicArn" : String
}
}
YAML
Type: "AWS::SNS::Subscription"
Properties:
DeliveryPolicy: JSON object
Endpoint: String
FilterPolicy: JSON object
Protocol: String
RawMessageDelivery: Boolean,
Region: String
TopicArn: String
参考:
如果您使用的是无服务器,它现在原生支持 sns 过滤器
functions:
pets:
handler: pets.handler
events:
- sns:
topicName: pets
filterPolicy:
pet:
- dog
- cat
https://serverless.com/framework/docs/providers/aws/events/sns#setting-a-filter-policy
更新:Cloudformation 现在支持 SNS 主题过滤器,所以这个问题不再相关,不需要自定义插件或代码。
我正在构建一个包含多个 SNS 主题和多个 Lambda 的系统,每个 Lambda 都从其分配的 SQS 队列中读取消息。 SQS 队列订阅了 SNS 主题,但也有一个过滤策略,因此消息将最终出现在相关的 SQS 队列中。
当我在 AWS 控制台中设置订阅时,它运行良好。
现在我正尝试在我的代码中执行相同的操作,但 AWS Cloudformation 文档没有描述如何向订阅添加过滤策略。基于 python examples here,我尝试了以下方法:
StopOperationSubscription:
Type: "AWS::SNS::Subscription"
Properties:
Protocol: sqs
TopicArn:
Ref: StatusTopic
Endpoint:
Fn::GetAtt: [StopActionQueue, Arn]
FilterPolicy: '{"value": ["stop"]}'
但是我得到这个错误:
An error occurred: StopOperationSubscription - Encountered unsupported property FilterPolicy.
如何使用 CloudFormation 设置我需要的过滤策略?如果不支持,您有什么替代建议?
我希望它在我部署无服务器应用程序时自动设置,无需手动步骤。
我是这样修复的:
serverless.yml
plugins:
- serverless-plugin-scripts
custom:
scripts:
commands:
update-topic-filters: sls invoke local -f configureSubscriptions --path resources/lambdaTopicFilters.json
hooks:
before:deploy:finalize: sls update-topic-filters
functions:
configureSubscriptions:
handler: src/configurationLambdas/configureSubscriptions.main
# Only invoked when deploying - therefore, no permissions or triggers are needed.
configureSubscriptions.js
import AWS from 'aws-sdk'
const nameFromArn = arn => arn.split(':').pop()
const lambdaNameFromArn = arn =>
nameFromArn(arn)
.split('-')
.pop()
exports.main = async event => {
const sns = new AWS.SNS({ apiVersion: '2010-03-31' })
const params = {}
const { Topics } = await sns.listTopics(params).promise()
for (const { TopicArn } of Topics) {
const topicName = nameFromArn(TopicArn)
const filtersForTopic = event[topicName]
if (!filtersForTopic) {
continue
}
const { Subscriptions } = await sns.listSubscriptionsByTopic({ TopicArn }).promise()
for (const { Protocol, Endpoint, SubscriptionArn } of Subscriptions) {
if (Protocol === 'lambda') {
const lambdaName = lambdaNameFromArn(Endpoint)
const filterForLambda = filtersForTopic[lambdaName]
if (!filterForLambda) {
continue
}
const setPolicyParams = {
AttributeName: 'FilterPolicy',
SubscriptionArn,
AttributeValue: JSON.stringify(filterForLambda),
}
await sns.setSubscriptionAttributes(setPolicyParams).promise()
// eslint-disable-next-line no-console
console.log('Subscription filters has been set')
}
}
}
}
顶级是不同的主题名称,下一级是lambda名称,第三级是相关订阅的过滤策略:
lambdaTopicFilters.json
{
"user-event": {
"activateUser": {
"valueType": ["status"],
"value": ["awaiting_activation"]
},
"findActivities": {
"messageType": ["event"],
"value": ["awaiting_activity_data"],
"valueType": ["status"]
}
},
"system-event": {
"startStopProcess": {
"valueType": ["status"],
"value": ["activated", "aborted", "limit_reached"]
}
}
}
Cloudformation 昨天才开始支持 FilterPolicy。我也纠结了一段时间:)
语法
JSON
{
"Type" : "AWS::SNS::Subscription",
"Properties" : {
"DeliveryPolicy" : JSON object,
"Endpoint" : String,
"FilterPolicy" : JSON object,
"Protocol" : String,
"RawMessageDelivery" : Boolean,
"Region" : String,
"TopicArn" : String
}
}
YAML
Type: "AWS::SNS::Subscription"
Properties:
DeliveryPolicy: JSON object
Endpoint: String
FilterPolicy: JSON object
Protocol: String
RawMessageDelivery: Boolean,
Region: String
TopicArn: String
参考:
如果您使用的是无服务器,它现在原生支持 sns 过滤器
functions:
pets:
handler: pets.handler
events:
- sns:
topicName: pets
filterPolicy:
pet:
- dog
- cat
https://serverless.com/framework/docs/providers/aws/events/sns#setting-a-filter-policy