Azure 函数 - Python - ServiceBus 输出绑定 - 设置自定义属性
Azure Function - Python - ServiceBus Output Binding - Setting Custom Properties
我有一个用 Python 编写的 Azure 函数,它有一个服务总线(主题)输出绑定。该函数由另一个队列触发,我们从 blobl 存储中处理一些文件,然后将另一条消息放入队列中。
我的 function.json 文件看起来像这样:
{
"bindings": [
{
"type": "serviceBus",
"connection": "Omnibus_Input_Send_Servicebus",
"name": "outputMessage",
"queueName": "validation-output-queue",
"accessRights": "send",
"direction": "out"
}
],
"disabled": false
}
在我的函数中,我可以像这样将消息发送到另一个队列:
with open(os.environ['outputMessage'], 'w') as output_message:
output_message.write('This is my output test message !')
它工作正常。现在我想向主题发送消息。我创建了一个 SQLFilter
and I need to set some custom properties to the BrokeredMessage
.
的订阅
从 azure sdk for python,我发现我可以添加这样的自定义属性(我已经使用 pip 安装了 azure 模块):
from azure.servicebus import Message
sent_msg = Message(b'This is the third message',
broker_properties={'Label': 'M3'},
custom_properties={'Priority': 'Medium',
'Customer': 'ABC'}
)
我的新 function.json 文件如下所示:
{
"bindings": [
{
"type": "serviceBus",
"connection": "Omnibus_Input_Send_Servicebus",
"name": "outputMessage",
"topicName": "validation-output-topic",
"accessRights": "send",
"direction": "out"
}
],
"disabled": false
}
而且我已经这样修改了我的函数:
from azure.servicebus import Message
sent_msg = Message(b'This is the third message',
broker_properties={'Label': 'M3'},
custom_properties={'Priority': 'Medium',
'Customer': 'ABC'}
)
with open(os.environ['outputMessage'], 'w') as output_message:
output_message.write(sent_msg)
当我 运行 函数时,我得到这个异常:
TypeError: expected a string or other character buffer object
我尝试使用 buffer
and the memoryview
函数,但仍然出现另一个异常:
TypeError: cannot make memory view because object does not have the buffer interface
我想知道实际绑定是否支持 BrokeredMessage 以及如何处理它?
Python(和其他脚本语言)的 ServiceBus 输出绑定仅支持简单的字符串映射,其中您指定的字符串成为在幕后创建的 BrokeredMessage 的内容。要设置任何扩展属性或执行任何更复杂的操作,您必须自己在函数中使用 Azure Python SDK。
在同样的情况下,我需要在输出服务总线中添加用户属性queue/topic,我直接使用了azure.servicebus.ServiceBusClient。
sb.Message class 有一个 user_properties setter:
def main(
httpreq: func.HttpRequest,
context: func.Context ):
sbClient : sb.ServiceBusClient = sb.ServiceBusClient.from_connection_string( os.getenv("AzureWebJobsServiceBus") )
topicClient : sb.TopicClient = sbClient.get_topic('scoring-testtopic')
message = sb.Message( httpreq.get_body().decode( 'UTF-8' ))
message.user_properties = {
'@AzureWebJobsParentId' : context.invocation_id,
'Prom' : '31000001'
}
topicClient.send( message )
我有一个用 Python 编写的 Azure 函数,它有一个服务总线(主题)输出绑定。该函数由另一个队列触发,我们从 blobl 存储中处理一些文件,然后将另一条消息放入队列中。
我的 function.json 文件看起来像这样:
{
"bindings": [
{
"type": "serviceBus",
"connection": "Omnibus_Input_Send_Servicebus",
"name": "outputMessage",
"queueName": "validation-output-queue",
"accessRights": "send",
"direction": "out"
}
],
"disabled": false
}
在我的函数中,我可以像这样将消息发送到另一个队列:
with open(os.environ['outputMessage'], 'w') as output_message:
output_message.write('This is my output test message !')
它工作正常。现在我想向主题发送消息。我创建了一个 SQLFilter
and I need to set some custom properties to the BrokeredMessage
.
从 azure sdk for python,我发现我可以添加这样的自定义属性(我已经使用 pip 安装了 azure 模块):
from azure.servicebus import Message
sent_msg = Message(b'This is the third message',
broker_properties={'Label': 'M3'},
custom_properties={'Priority': 'Medium',
'Customer': 'ABC'}
)
我的新 function.json 文件如下所示:
{
"bindings": [
{
"type": "serviceBus",
"connection": "Omnibus_Input_Send_Servicebus",
"name": "outputMessage",
"topicName": "validation-output-topic",
"accessRights": "send",
"direction": "out"
}
],
"disabled": false
}
而且我已经这样修改了我的函数:
from azure.servicebus import Message
sent_msg = Message(b'This is the third message',
broker_properties={'Label': 'M3'},
custom_properties={'Priority': 'Medium',
'Customer': 'ABC'}
)
with open(os.environ['outputMessage'], 'w') as output_message:
output_message.write(sent_msg)
当我 运行 函数时,我得到这个异常:
TypeError: expected a string or other character buffer object
我尝试使用 buffer
and the memoryview
函数,但仍然出现另一个异常:
TypeError: cannot make memory view because object does not have the buffer interface
我想知道实际绑定是否支持 BrokeredMessage 以及如何处理它?
Python(和其他脚本语言)的 ServiceBus 输出绑定仅支持简单的字符串映射,其中您指定的字符串成为在幕后创建的 BrokeredMessage 的内容。要设置任何扩展属性或执行任何更复杂的操作,您必须自己在函数中使用 Azure Python SDK。
在同样的情况下,我需要在输出服务总线中添加用户属性queue/topic,我直接使用了azure.servicebus.ServiceBusClient。
sb.Message class 有一个 user_properties setter:
def main(
httpreq: func.HttpRequest,
context: func.Context ):
sbClient : sb.ServiceBusClient = sb.ServiceBusClient.from_connection_string( os.getenv("AzureWebJobsServiceBus") )
topicClient : sb.TopicClient = sbClient.get_topic('scoring-testtopic')
message = sb.Message( httpreq.get_body().decode( 'UTF-8' ))
message.user_properties = {
'@AzureWebJobsParentId' : context.invocation_id,
'Prom' : '31000001'
}
topicClient.send( message )