Azure 函数 - Python - ServiceBus 输出绑定 - 设置自定义属性

Azure Function - Python - ServiceBus Output Binding - Setting Custom Properties

我有一个用 Python 编写的 Azure 函数,它有一个服务总线(主题)输出绑定。该函数由另一个队列触发,我们从 blobl 存储中处理一些文件,然后将另一条消息放入队列中。

我的 function.json 文件看起来像这样:

{
"bindings": [
    {
    "type": "serviceBus",
    "connection": "Omnibus_Input_Send_Servicebus",
    "name": "outputMessage",
    "queueName": "validation-output-queue",
    "accessRights": "send",
    "direction": "out"
    }
],
"disabled": false
}

在我的函数中,我可以像这样将消息发送到另一个队列:

with open(os.environ['outputMessage'], 'w') as output_message:
    output_message.write('This is my output test message !')

它工作正常。现在我想向主题发送消息。我创建了一个 SQLFilter and I need to set some custom properties to the BrokeredMessage.

的订阅

azure sdk for python,我发现我可以添加这样的自定义属性(我已经使用 pip 安装了 azure 模块):

from azure.servicebus import Message
sent_msg = Message(b'This is the third message',
    broker_properties={'Label': 'M3'},
    custom_properties={'Priority': 'Medium',
                        'Customer': 'ABC'}
)

我的新 function.json 文件如下所示:

{
"bindings": [
    {
    "type": "serviceBus",
    "connection": "Omnibus_Input_Send_Servicebus",
    "name": "outputMessage",
    "topicName": "validation-output-topic",
    "accessRights": "send",
    "direction": "out"
    }
],
"disabled": false
}

而且我已经这样修改了我的函数:

from azure.servicebus import Message
sent_msg = Message(b'This is the third message',
    broker_properties={'Label': 'M3'},
    custom_properties={'Priority': 'Medium',
                        'Customer': 'ABC'}
)

with open(os.environ['outputMessage'], 'w') as output_message:
    output_message.write(sent_msg)

当我 运行 函数时,我得到这个异常:

TypeError: expected a string or other character buffer object

我尝试使用 buffer and the memoryview 函数,但仍然出现另一个异常:

TypeError: cannot make memory view because object does not have the buffer interface

我想知道实际绑定是否支持 BrokeredMessage 以及如何处理它?

Python(和其他脚本语言)的 ServiceBus 输出绑定仅支持简单的字符串映射,其中您指定的字符串成为在幕后创建的 BrokeredMessage 的内容。要设置任何扩展属性或执行任何更复杂的操作,您必须自己在函数中使用 Azure Python SDK。

在同样的情况下,我需要在输出服务总线中添加用户属性queue/topic,我直接使用了azure.servicebus.ServiceBusClient。

sb.Message class 有一个 user_properties setter:

def main(
    httpreq: func.HttpRequest,
    context: func.Context  ):

sbClient : sb.ServiceBusClient = sb.ServiceBusClient.from_connection_string( os.getenv("AzureWebJobsServiceBus") )
topicClient : sb.TopicClient = sbClient.get_topic('scoring-testtopic')

message = sb.Message( httpreq.get_body().decode( 'UTF-8' ))

message.user_properties = { 
    '@AzureWebJobsParentId' : context.invocation_id,
    'Prom' : '31000001'
    }

topicClient.send( message )