使用 powershell 从服务总线 queue 获取所有包含信息的消息
Get all messages with info from service bus queue with powershell
我的问题 - 是否有可能从服务总线 queue 获取所有信息,如内容和 headers 使用 PowerShell 的所有信息?
拿一个我用的
$postService = Invoke-WebRequest -Uri "https://myservice.servicebus.windows.net/test/messages/head" -Headers $header -Method Post
但是它 returns 不同的消息我需要相同的或所有的列表,不幸的是,我找不到用 PS 来做到这一点的方法,但只能用 C# =/
也许有人可以帮忙?
下载 code 并使用 Get-Message
从服务总线获取消息。
function Get-Message {
<#
.SYNOPSIS
Receives a message from a receiver client, which is based on the Service Bus Messaging Factory.
.DESCRIPTION
Receives a message from a receiver client, which is based on the Service Bus Messaging Factory.
.OUTPUTS
This Cmdlet returns the Messaging Factory message object. In case of failure is trying '-Retry' times, otherwise returns $null.
.INPUTS
See PARAMETER section for a description of input parameters.
.EXAMPLE
Retrieves a message from ServiceBus. The message is immediately removed from the facility after receipt.
PS > Get-Message;
CorrelationId :
SessionId :
ReplyToSessionId :
DeliveryCount : 1
ExpiresAtUtc : 31.12.9999 23:59:59
LockedUntilUtc :
LockToken :
MessageId : e6dc938213264de790e53ace7949f610
ContentType :
Label :
Properties : {}
ReplyTo :
EnqueuedTimeUtc : 24.12.2015 21:00:00
ScheduledEnqueueTimeUtc : 01.01.0001 00:00:00
SequenceNumber : 1
EnqueuedSequenceNumber : 0
Size : 6
State : Active
TimeToLive : 10675199.02:48:05.4775807
To :
IsBodyConsumed : False
.EXAMPLE
Retrieves a message from ServiceBus and save the message contents (body) to a variable. The message is immediately removed from the facility after receipt.
PS > $message = Get-Message -BodyAsProperty;
PS > write-host $message.Properties['Body']
I am a message body from ServiceBus with an arbitrary content.
.EXAMPLE
Similar to the previous example, but message is explicitly removed from the facility (in case the module default configuration is set to 'PeekLock').
PS > $message = Get-Message -ReceiveAndDelete -BodyAsProperty;
PS > write-host $message.Properties['Body']
I am a message body from ServiceBus with an arbitrary content.
.EXAMPLE
This example receives a message from the service bus. The message will be kept in the queue until it is explicity marked as 'Complete'
PS > $message = Get-Message -Receivemode 'PeekLock';
PS > # do something with the message before you acknowledge receipt
PS > $message.Complete();
.EXAMPLE
This example retrieves a message from the ServiceBus facility 'Topics-Newsletter'. The facility will be created if it does not exist.
PS > $message = Get-Message -Facility 'Topic-Newsletter' -EnsureFacility;
.EXAMPLE
Receives a message from a receiver client, which is based on the Service Bus Messaging Factory against server defined within module configuration xml file.
PS > $receiver = Get-MessageReceiver -Facility 'MyQueue1';
PS > $message = Get-Message -Client $receiver;
#>
[CmdletBinding(
HelpURI = 'http://dfch.biz/biz/dfch/PS/AzureServiceBus/Client/'
)]
[OutputType([Microsoft.ServiceBus.Messaging.BrokeredMessage])]
Param
(
# [Optional] The WaitTimeOutSec such as '3' Seconds for receiving a message before it times out. If you do not specify this
# value it is taken from the default value = 3 sec.
[Parameter(Mandatory = $false, Position = 0)]
[ValidateNotNullorEmpty()]
[int] $WaitTimeoutSec = [int]::MaxValue
,
# [Optional] The Receivemode such as 'PeekLock'. If you do not specify this
# value it is taken from the default parameter.
[Parameter(Mandatory = $false, Position = 1)]
[ValidateSet('PeekLock', 'ReceiveAndDelete')]
[string] $Receivemode = 'ReceiveAndDelete'
,
# [Optional] ReceiveAndDelete same as Receivemode 'ReceiveAndDelete'. If you do not specify this
# value it is taken from the default parameter.
[Parameter(Mandatory = $false, Position = 2)]
[switch] $ReceiveAndDelete = $false
,
# [Optional] ReceiveAndComplete same as Receivemode 'PeekLock' and $Message.Complete(). If you do not specify this
# value it is taken from the default parameter.
[Parameter(Mandatory = $false, Position = 3)]
[switch] $ReceiveAndComplete = $false
,
# [Optional] ReceiveAndAbandon same as Receivemode 'PeekLock' and $Message.Abandon(). If you do not specify this
# value it is taken from the default parameter.
[Parameter(Mandatory = $false, Position = 4)]
[switch] $ReceiveAndAbandon = $false
,
# [Optional] BodyAsProperty writes the Message Body to Property 'Body'. If you do not specify this
# value it is taken from the default parameter.
[Parameter(Mandatory = $false, Position = 5)]
[switch] $BodyAsProperty = $false
,
# [Optional] The Facility such as 'MyQueue'. If you do not specify this
# value it is taken from the module configuration file.
[Parameter(Mandatory = $false, Position = 6)]
[ValidateNotNullorEmpty()]
[alias("queue")]
[alias("subscription")]
[alias("QueueName")]
[string] $Facility = (Get-Variable -Name $MyInvocation.MyCommand.Module.PrivateData.MODULEVAR -ValueOnly).ReceiveFacility
,
# [Optional] Specifies if the facility will be created if it does not exist
[Parameter(Mandatory = $false, Position = 7)]
[alias("ensure")]
[alias("broadcast")]
[alias("checkfacility")]
[switch]$EnsureFacility = $false
,
# [Optional] Messaging Client (instance of the MessagingFactory)
[Parameter(Mandatory = $false, Position = 8)]
[alias("MessageClient")]
$Client
,
# [Optional] Skip retrying
[Parameter(Mandatory = $false, Position = 9)]
[switch]$NoRetry = $false
,
# [Optional] The Retry. If you do not specify this
# value it is taken from the module configuration file.
[Parameter(Mandatory=$false, Position = 10)]
[int]$Retry = (Get-Variable -Name $MyInvocation.MyCommand.Module.PrivateData.MODULEVAR -ValueOnly).CommandRetry
,
# [Optional] The RetryInterval. If you do not specify this
# value it is taken from the module configuration file.
[Parameter(Mandatory=$false, Position = 11)]
[int]$RetryInterval = (Get-Variable -Name $MyInvocation.MyCommand.Module.PrivateData.MODULEVAR -ValueOnly).CommandRetryInterval
)
BEGIN
{
$datBegin = [datetime]::Now;
[string] $fn = $MyInvocation.MyCommand.Name;
Log-Debug $fn ("CALL. Facility '{0}'" -f $Facility ) -fac 1;
# Factory validation
if((Get-Variable -Name $MyInvocation.MyCommand.Module.PrivateData.MODULEVAR -ValueOnly).Factory -isnot [Microsoft.ServiceBus.Messaging.MessagingFactory]) {
$msg = "Factory: Factory validation FAILED. Connect to the server before using the Cmdlet.";
$e = New-CustomErrorRecord -m $msg -cat InvalidData -o (Get-Variable -Name $MyInvocation.MyCommand.Module.PrivateData.MODULEVAR -ValueOnly).Factory;
$PSCmdlet.ThrowTerminatingError($e);
} # if
}
# BEGIN
PROCESS
{
[boolean] $fReturn = $false;
# Get all parameters
$Params = @{};
$ParamsList = (Get-Command -Name $MyInvocation.InvocationName).Parameters;
foreach ($key in $ParamsList.keys)
{
$var = Get-Variable -Name $key -ErrorAction SilentlyContinue;
if($var)
{
if ( @('Retry', 'RetryInterval', 'NoRetry') -notcontains $($var.name) -and $var.value -ne $null -and $($var.value) -ne '' )
{
$Params.Add($($var.name), $var.value);
}
}
}
#Log-Debug $fn ("Operation [{0}] arguments: {1}" -f ($fn -replace 'Worker', ''), ($Params | Out-String));
# Retry handling
for($c = 1; $c -le ($Retry+1); $c++)
{
try
{
$OutputParameter = Get-MessageWorker @Params;
break;
}
catch
{
# Throw last execption
if ( $NoRetry -or
$c -gt $Retry -or
$_.Exception.Message -match 'Connect to the message factory before using the Cmdlet.' -or
$_.Exception.Message -match 'The message body cannot be read multiple times.'
)
{
if ($PSCmdlet.MyInvocation.BoundParameters["Debug"].IsPresent)
{
throw;
}
else
{
break;
}
}
Log-Debug $fn ("[{0}/{1}] Retrying operation [{2}]" -f $c, $Retry, ($fn -replace 'Worker', ''));
Start-Sleep -Seconds $RetryInterval;
$RetryInterval *= 2;
continue;
}
}
return $OutputParameter;
$fReturn = $true;
}
# PROCESS
END
{
$datEnd = [datetime]::Now;
Log-Debug -fn $fn -msg ("RET. fReturn: [{0}]. Execution time: [{1}]ms. Started: [{2}]." -f $fReturn, ($datEnd - $datBegin).TotalMilliseconds, $datBegin.ToString('yyyy-MM-dd HH:mm:ss.fffzzz')) -fac 2;
}
# END
} # function
if($MyInvocation.ScriptName) { Export-ModuleMember -Function Get-Message; }
function Get-MessageWorker {
<#
.SYNOPSIS
Receives a message from a receiver client, which is based on the Service Bus Messaging Factory.
.DESCRIPTION
Receives a message from a receiver client, which is based on the Service Bus Messaging Factory.
.OUTPUTS
This Cmdlet returns the MessageId from the Messaging Factory message object. On failure it returns $null.
.INPUTS
See PARAMETER section for a description of input parameters.
.EXAMPLE
$message = Get-MessageWorker;
Receives a message from a receiver client, which is based on the Service Bus Messaging Factory.
#>
[CmdletBinding(
HelpURI = 'http://dfch.biz/biz/dfch/PS/AzureServiceBus/Client/'
)]
[OutputType([Microsoft.ServiceBus.Messaging.BrokeredMessage])]
Param
(
# [Optional] The WaitTimeOutSec such as '3' Seconds for receiving a message before it times out. If you do not specify this
# value it is taken from the default value = 3 sec.
[Parameter(Mandatory = $false, Position = 0)]
[ValidateNotNullorEmpty()]
[int] $WaitTimeoutSec = [int]::MaxValue
,
# [Optional] The Receivemode such as 'PeekLock'. If you do not specify this
# value it is taken from the default parameter.
[Parameter(Mandatory = $false, Position = 1)]
[ValidateSet('PeekLock', 'ReceiveAndDelete')]
[string] $Receivemode = 'ReceiveAndDelete'
,
# [Optional] ReceiveAndDelete same as Receivemode 'ReceiveAndDelete'. If you do not specify this
# value it is taken from the default parameter.
[Parameter(Mandatory = $false, Position = 2)]
[switch] $ReceiveAndDelete = $false
,
# [Optional] ReceiveAndComplete same as Receivemode 'PeekLock' and $Message.Complete(). If you do not specify this
# value it is taken from the default parameter.
[Parameter(Mandatory = $false, Position = 3)]
[switch] $ReceiveAndComplete = $false
,
# [Optional] ReceiveAndAbandon same as Receivemode 'PeekLock' and $Message.Abandon(). If you do not specify this
# value it is taken from the default parameter.
[Parameter(Mandatory = $false, Position = 4)]
[switch] $ReceiveAndAbandon = $false
,
# [Optional] BodyAsProperty writes the Message Body to Property 'Body'. If you do not specify this
# value it is taken from the default parameter.
[Parameter(Mandatory = $false, Position = 5)]
[switch] $BodyAsProperty = $false
,
# [Optional] The Facility such as 'MyQueue'. If you do not specify this
# value it is taken from the module configuration file.
[Parameter(Mandatory = $false, Position = 6)]
[ValidateNotNullorEmpty()]
[alias("queue")]
[alias("subscription")]
[alias("QueueName")]
[string] $Facility = (Get-Variable -Name $MyInvocation.MyCommand.Module.PrivateData.MODULEVAR -ValueOnly).ReceiveFacility
,
# [Optional] Checks if facility existing
[Parameter(Mandatory = $false, Position = 7)]
[alias("ensure")]
[alias("broadcast")]
[alias("CreateIfNotExist")]
[switch] $EnsureFacility = $false
,
# [Optional] Messaging Client (instance of the MessagingFactory)
[Parameter(Mandatory = $false, Position = 8)]
[alias("MessageClient")]
$Client
)
BEGIN
{
$datBegin = [datetime]::Now;
[string] $fn = $MyInvocation.MyCommand.Name;
Log-Debug $fn ("CALL. Facility '{0}'" -f $Facility ) -fac 1;
}
# BEGIN
PROCESS
{
[boolean] $fReturn = $false;
try
{
# Parameter validation
if ( $ReceiveAndDelete )
{
$Receivemode = 'ReceiveAndDelete';
}
# Check facility
if ( $EnsureFacility )
{
$Path = ($Facility -Split "\Subscriptions\")[0];
$SubscriptionName = "RECV-{0}" -f (get-wmiobject Win32_ComputerSystemProduct | Select-Object -ExpandProperty UUID).toString();
if ( $Facility -match "\Subscriptions\" )
{
$SubscriptionName = ($Facility -Split "\Subscriptions\")[1];
}
try
{
$FacilitiyExists = New-MessageFacility -Path $Path -Name $SubscriptionName;
$Facility = '{0}\Subscriptions\{1}' -f $Path, $SubscriptionName;
}
catch
{
$msg = $_.Exception.Message;
Log-Error -msg $msg;
$e = New-CustomErrorRecord -m $msg -cat InvalidData -o $Facility;
$PSCmdlet.ThrowTerminatingError($e);
}
}
# Create Client
if ( !$PSBoundParameters.ContainsKey('Client') )
{
try
{
$Client = Get-MessageReceiver -Facility $Facility -Receivemode $Receivemode;
}
catch
{
$msg = $_.Exception.Message;
$e = New-CustomErrorRecord -m $msg -cat InvalidData -o $Client;
Log-Error $fn -msg $msg;
$PSCmdlet.ThrowTerminatingError($e);
}
}
# Get Message
try
{
# DFTODO - is this really an AMQP message when we are using the BrokeredMessage class?, see #5
[Microsoft.ServiceBus.Messaging.BrokeredMessage] $BrokeredMessage = $Client.Receive((New-TimeSpan -Seconds $WaitTimeoutSec));
} catch {
$msg = $_.Exception.Message;
$e = New-CustomErrorRecord -m $msg -cat InvalidData -o $BrokeredMessage;
Log-Error $fn -msg $msg;
$PSCmdlet.ThrowTerminatingError($e);
}
# Process Message
if ( $BrokeredMessage -ne $null )
{
if ( $ReceiveAndAbandon -and $Receivemode -ne 'ReceiveAndDelete' )
{
$BrokeredMessage.Abandon();
}
if ( $ReceiveAndComplete -and $Receivemode -ne 'ReceiveAndDelete' )
{
$BrokeredMessage.Complete();
}
if ( $BodyAsProperty )
{
$PropertyName = 'Body';
$PropertyValue = Get-MessageBody -Message $BrokeredMessage;
if ( $BrokeredMessage.Properties.ContainsKey($PropertyName) -and $BrokeredMessage.Properties[$PropertyName].toString() -ne $PropertyValue.toString() )
{
[int] $PropertyCount = 1;
$PropertyName = ("Body{0}" -f $PropertyCount);
while( $BrokeredMessage.Properties.ContainsKey($PropertyName) )
{
$PropertyCount += 1;
$PropertyName = ("Body{0}" -f $PropertyCount);
}
}
$BrokeredMessage.Properties[$PropertyName] = $PropertyValue;
}
}
$OutputParameter = $BrokeredMessage;
$fReturn = $true;
}
catch
{
if($gotoSuccess -eq $_.Exception.Message)
{
$fReturn = $true;
}
else
{
[string] $ErrorText = "catch [$($_.FullyQualifiedErrorId)]";
$ErrorText += (($_ | fl * -Force) | Out-String);
$ErrorText += (($_.Exception | fl * -Force) | Out-String);
$ErrorText += (Get-PSCallStack | Out-String);
if($_.Exception -is [System.Net.WebException])
{
Log-Critical $fn "Login to Uri '$Uri' with Username '$Username' FAILED [$_].";
Log-Debug $fn $ErrorText -fac 3;
}
else
{
Log-Error $fn $ErrorText -fac 3;
if($gotoError -eq $_.Exception.Message)
{
Log-Error $fn $e.Exception.Message;
$PSCmdlet.ThrowTerminatingError($e);
}
elseif($gotoFailure -ne $_.Exception.Message)
{
Write-Verbose ("$fn`n$ErrorText");
}
else
{
# N/A
}
}
$fReturn = $false;
$OutputParameter = $null;
}
}
finally
{
# Clean up
# N/A
}
return $OutputParameter;
}
# PROCESS
END
{
$datEnd = [datetime]::Now;
Log-Debug -fn $fn -msg ("RET. fReturn: [{0}]. Execution time: [{1}]ms. Started: [{2}]." -f $fReturn, ($datEnd - $datBegin).TotalMilliseconds, $datBegin.ToString('yyyy-MM-dd HH:mm:ss.fffzzz')) -fac 2;
}
# END
} # function
我为此编写了一个 .NET 包装器,它公开了可以从 PowerShell 本机调用的 PowerShell cmdlet。可以找到存储库
在 https://github.com/sumanthvrao/AzurePowerShellCmdlets.
导入 dll 后,您可以使用 Get-ServiceBusMessage
cmdlet 连接并从服务总线主题获取消息,例如
$message = Get-ServiceBusMessage -connectionString $env:connString -topicName "test" -subscriptionName "abc"
我的问题 - 是否有可能从服务总线 queue 获取所有信息,如内容和 headers 使用 PowerShell 的所有信息?
拿一个我用的
$postService = Invoke-WebRequest -Uri "https://myservice.servicebus.windows.net/test/messages/head" -Headers $header -Method Post
但是它 returns 不同的消息我需要相同的或所有的列表,不幸的是,我找不到用 PS 来做到这一点的方法,但只能用 C# =/
也许有人可以帮忙?
下载 code 并使用 Get-Message
从服务总线获取消息。
function Get-Message {
<#
.SYNOPSIS
Receives a message from a receiver client, which is based on the Service Bus Messaging Factory.
.DESCRIPTION
Receives a message from a receiver client, which is based on the Service Bus Messaging Factory.
.OUTPUTS
This Cmdlet returns the Messaging Factory message object. In case of failure is trying '-Retry' times, otherwise returns $null.
.INPUTS
See PARAMETER section for a description of input parameters.
.EXAMPLE
Retrieves a message from ServiceBus. The message is immediately removed from the facility after receipt.
PS > Get-Message;
CorrelationId :
SessionId :
ReplyToSessionId :
DeliveryCount : 1
ExpiresAtUtc : 31.12.9999 23:59:59
LockedUntilUtc :
LockToken :
MessageId : e6dc938213264de790e53ace7949f610
ContentType :
Label :
Properties : {}
ReplyTo :
EnqueuedTimeUtc : 24.12.2015 21:00:00
ScheduledEnqueueTimeUtc : 01.01.0001 00:00:00
SequenceNumber : 1
EnqueuedSequenceNumber : 0
Size : 6
State : Active
TimeToLive : 10675199.02:48:05.4775807
To :
IsBodyConsumed : False
.EXAMPLE
Retrieves a message from ServiceBus and save the message contents (body) to a variable. The message is immediately removed from the facility after receipt.
PS > $message = Get-Message -BodyAsProperty;
PS > write-host $message.Properties['Body']
I am a message body from ServiceBus with an arbitrary content.
.EXAMPLE
Similar to the previous example, but message is explicitly removed from the facility (in case the module default configuration is set to 'PeekLock').
PS > $message = Get-Message -ReceiveAndDelete -BodyAsProperty;
PS > write-host $message.Properties['Body']
I am a message body from ServiceBus with an arbitrary content.
.EXAMPLE
This example receives a message from the service bus. The message will be kept in the queue until it is explicity marked as 'Complete'
PS > $message = Get-Message -Receivemode 'PeekLock';
PS > # do something with the message before you acknowledge receipt
PS > $message.Complete();
.EXAMPLE
This example retrieves a message from the ServiceBus facility 'Topics-Newsletter'. The facility will be created if it does not exist.
PS > $message = Get-Message -Facility 'Topic-Newsletter' -EnsureFacility;
.EXAMPLE
Receives a message from a receiver client, which is based on the Service Bus Messaging Factory against server defined within module configuration xml file.
PS > $receiver = Get-MessageReceiver -Facility 'MyQueue1';
PS > $message = Get-Message -Client $receiver;
#>
[CmdletBinding(
HelpURI = 'http://dfch.biz/biz/dfch/PS/AzureServiceBus/Client/'
)]
[OutputType([Microsoft.ServiceBus.Messaging.BrokeredMessage])]
Param
(
# [Optional] The WaitTimeOutSec such as '3' Seconds for receiving a message before it times out. If you do not specify this
# value it is taken from the default value = 3 sec.
[Parameter(Mandatory = $false, Position = 0)]
[ValidateNotNullorEmpty()]
[int] $WaitTimeoutSec = [int]::MaxValue
,
# [Optional] The Receivemode such as 'PeekLock'. If you do not specify this
# value it is taken from the default parameter.
[Parameter(Mandatory = $false, Position = 1)]
[ValidateSet('PeekLock', 'ReceiveAndDelete')]
[string] $Receivemode = 'ReceiveAndDelete'
,
# [Optional] ReceiveAndDelete same as Receivemode 'ReceiveAndDelete'. If you do not specify this
# value it is taken from the default parameter.
[Parameter(Mandatory = $false, Position = 2)]
[switch] $ReceiveAndDelete = $false
,
# [Optional] ReceiveAndComplete same as Receivemode 'PeekLock' and $Message.Complete(). If you do not specify this
# value it is taken from the default parameter.
[Parameter(Mandatory = $false, Position = 3)]
[switch] $ReceiveAndComplete = $false
,
# [Optional] ReceiveAndAbandon same as Receivemode 'PeekLock' and $Message.Abandon(). If you do not specify this
# value it is taken from the default parameter.
[Parameter(Mandatory = $false, Position = 4)]
[switch] $ReceiveAndAbandon = $false
,
# [Optional] BodyAsProperty writes the Message Body to Property 'Body'. If you do not specify this
# value it is taken from the default parameter.
[Parameter(Mandatory = $false, Position = 5)]
[switch] $BodyAsProperty = $false
,
# [Optional] The Facility such as 'MyQueue'. If you do not specify this
# value it is taken from the module configuration file.
[Parameter(Mandatory = $false, Position = 6)]
[ValidateNotNullorEmpty()]
[alias("queue")]
[alias("subscription")]
[alias("QueueName")]
[string] $Facility = (Get-Variable -Name $MyInvocation.MyCommand.Module.PrivateData.MODULEVAR -ValueOnly).ReceiveFacility
,
# [Optional] Specifies if the facility will be created if it does not exist
[Parameter(Mandatory = $false, Position = 7)]
[alias("ensure")]
[alias("broadcast")]
[alias("checkfacility")]
[switch]$EnsureFacility = $false
,
# [Optional] Messaging Client (instance of the MessagingFactory)
[Parameter(Mandatory = $false, Position = 8)]
[alias("MessageClient")]
$Client
,
# [Optional] Skip retrying
[Parameter(Mandatory = $false, Position = 9)]
[switch]$NoRetry = $false
,
# [Optional] The Retry. If you do not specify this
# value it is taken from the module configuration file.
[Parameter(Mandatory=$false, Position = 10)]
[int]$Retry = (Get-Variable -Name $MyInvocation.MyCommand.Module.PrivateData.MODULEVAR -ValueOnly).CommandRetry
,
# [Optional] The RetryInterval. If you do not specify this
# value it is taken from the module configuration file.
[Parameter(Mandatory=$false, Position = 11)]
[int]$RetryInterval = (Get-Variable -Name $MyInvocation.MyCommand.Module.PrivateData.MODULEVAR -ValueOnly).CommandRetryInterval
)
BEGIN
{
$datBegin = [datetime]::Now;
[string] $fn = $MyInvocation.MyCommand.Name;
Log-Debug $fn ("CALL. Facility '{0}'" -f $Facility ) -fac 1;
# Factory validation
if((Get-Variable -Name $MyInvocation.MyCommand.Module.PrivateData.MODULEVAR -ValueOnly).Factory -isnot [Microsoft.ServiceBus.Messaging.MessagingFactory]) {
$msg = "Factory: Factory validation FAILED. Connect to the server before using the Cmdlet.";
$e = New-CustomErrorRecord -m $msg -cat InvalidData -o (Get-Variable -Name $MyInvocation.MyCommand.Module.PrivateData.MODULEVAR -ValueOnly).Factory;
$PSCmdlet.ThrowTerminatingError($e);
} # if
}
# BEGIN
PROCESS
{
[boolean] $fReturn = $false;
# Get all parameters
$Params = @{};
$ParamsList = (Get-Command -Name $MyInvocation.InvocationName).Parameters;
foreach ($key in $ParamsList.keys)
{
$var = Get-Variable -Name $key -ErrorAction SilentlyContinue;
if($var)
{
if ( @('Retry', 'RetryInterval', 'NoRetry') -notcontains $($var.name) -and $var.value -ne $null -and $($var.value) -ne '' )
{
$Params.Add($($var.name), $var.value);
}
}
}
#Log-Debug $fn ("Operation [{0}] arguments: {1}" -f ($fn -replace 'Worker', ''), ($Params | Out-String));
# Retry handling
for($c = 1; $c -le ($Retry+1); $c++)
{
try
{
$OutputParameter = Get-MessageWorker @Params;
break;
}
catch
{
# Throw last execption
if ( $NoRetry -or
$c -gt $Retry -or
$_.Exception.Message -match 'Connect to the message factory before using the Cmdlet.' -or
$_.Exception.Message -match 'The message body cannot be read multiple times.'
)
{
if ($PSCmdlet.MyInvocation.BoundParameters["Debug"].IsPresent)
{
throw;
}
else
{
break;
}
}
Log-Debug $fn ("[{0}/{1}] Retrying operation [{2}]" -f $c, $Retry, ($fn -replace 'Worker', ''));
Start-Sleep -Seconds $RetryInterval;
$RetryInterval *= 2;
continue;
}
}
return $OutputParameter;
$fReturn = $true;
}
# PROCESS
END
{
$datEnd = [datetime]::Now;
Log-Debug -fn $fn -msg ("RET. fReturn: [{0}]. Execution time: [{1}]ms. Started: [{2}]." -f $fReturn, ($datEnd - $datBegin).TotalMilliseconds, $datBegin.ToString('yyyy-MM-dd HH:mm:ss.fffzzz')) -fac 2;
}
# END
} # function
if($MyInvocation.ScriptName) { Export-ModuleMember -Function Get-Message; }
function Get-MessageWorker {
<#
.SYNOPSIS
Receives a message from a receiver client, which is based on the Service Bus Messaging Factory.
.DESCRIPTION
Receives a message from a receiver client, which is based on the Service Bus Messaging Factory.
.OUTPUTS
This Cmdlet returns the MessageId from the Messaging Factory message object. On failure it returns $null.
.INPUTS
See PARAMETER section for a description of input parameters.
.EXAMPLE
$message = Get-MessageWorker;
Receives a message from a receiver client, which is based on the Service Bus Messaging Factory.
#>
[CmdletBinding(
HelpURI = 'http://dfch.biz/biz/dfch/PS/AzureServiceBus/Client/'
)]
[OutputType([Microsoft.ServiceBus.Messaging.BrokeredMessage])]
Param
(
# [Optional] The WaitTimeOutSec such as '3' Seconds for receiving a message before it times out. If you do not specify this
# value it is taken from the default value = 3 sec.
[Parameter(Mandatory = $false, Position = 0)]
[ValidateNotNullorEmpty()]
[int] $WaitTimeoutSec = [int]::MaxValue
,
# [Optional] The Receivemode such as 'PeekLock'. If you do not specify this
# value it is taken from the default parameter.
[Parameter(Mandatory = $false, Position = 1)]
[ValidateSet('PeekLock', 'ReceiveAndDelete')]
[string] $Receivemode = 'ReceiveAndDelete'
,
# [Optional] ReceiveAndDelete same as Receivemode 'ReceiveAndDelete'. If you do not specify this
# value it is taken from the default parameter.
[Parameter(Mandatory = $false, Position = 2)]
[switch] $ReceiveAndDelete = $false
,
# [Optional] ReceiveAndComplete same as Receivemode 'PeekLock' and $Message.Complete(). If you do not specify this
# value it is taken from the default parameter.
[Parameter(Mandatory = $false, Position = 3)]
[switch] $ReceiveAndComplete = $false
,
# [Optional] ReceiveAndAbandon same as Receivemode 'PeekLock' and $Message.Abandon(). If you do not specify this
# value it is taken from the default parameter.
[Parameter(Mandatory = $false, Position = 4)]
[switch] $ReceiveAndAbandon = $false
,
# [Optional] BodyAsProperty writes the Message Body to Property 'Body'. If you do not specify this
# value it is taken from the default parameter.
[Parameter(Mandatory = $false, Position = 5)]
[switch] $BodyAsProperty = $false
,
# [Optional] The Facility such as 'MyQueue'. If you do not specify this
# value it is taken from the module configuration file.
[Parameter(Mandatory = $false, Position = 6)]
[ValidateNotNullorEmpty()]
[alias("queue")]
[alias("subscription")]
[alias("QueueName")]
[string] $Facility = (Get-Variable -Name $MyInvocation.MyCommand.Module.PrivateData.MODULEVAR -ValueOnly).ReceiveFacility
,
# [Optional] Checks if facility existing
[Parameter(Mandatory = $false, Position = 7)]
[alias("ensure")]
[alias("broadcast")]
[alias("CreateIfNotExist")]
[switch] $EnsureFacility = $false
,
# [Optional] Messaging Client (instance of the MessagingFactory)
[Parameter(Mandatory = $false, Position = 8)]
[alias("MessageClient")]
$Client
)
BEGIN
{
$datBegin = [datetime]::Now;
[string] $fn = $MyInvocation.MyCommand.Name;
Log-Debug $fn ("CALL. Facility '{0}'" -f $Facility ) -fac 1;
}
# BEGIN
PROCESS
{
[boolean] $fReturn = $false;
try
{
# Parameter validation
if ( $ReceiveAndDelete )
{
$Receivemode = 'ReceiveAndDelete';
}
# Check facility
if ( $EnsureFacility )
{
$Path = ($Facility -Split "\Subscriptions\")[0];
$SubscriptionName = "RECV-{0}" -f (get-wmiobject Win32_ComputerSystemProduct | Select-Object -ExpandProperty UUID).toString();
if ( $Facility -match "\Subscriptions\" )
{
$SubscriptionName = ($Facility -Split "\Subscriptions\")[1];
}
try
{
$FacilitiyExists = New-MessageFacility -Path $Path -Name $SubscriptionName;
$Facility = '{0}\Subscriptions\{1}' -f $Path, $SubscriptionName;
}
catch
{
$msg = $_.Exception.Message;
Log-Error -msg $msg;
$e = New-CustomErrorRecord -m $msg -cat InvalidData -o $Facility;
$PSCmdlet.ThrowTerminatingError($e);
}
}
# Create Client
if ( !$PSBoundParameters.ContainsKey('Client') )
{
try
{
$Client = Get-MessageReceiver -Facility $Facility -Receivemode $Receivemode;
}
catch
{
$msg = $_.Exception.Message;
$e = New-CustomErrorRecord -m $msg -cat InvalidData -o $Client;
Log-Error $fn -msg $msg;
$PSCmdlet.ThrowTerminatingError($e);
}
}
# Get Message
try
{
# DFTODO - is this really an AMQP message when we are using the BrokeredMessage class?, see #5
[Microsoft.ServiceBus.Messaging.BrokeredMessage] $BrokeredMessage = $Client.Receive((New-TimeSpan -Seconds $WaitTimeoutSec));
} catch {
$msg = $_.Exception.Message;
$e = New-CustomErrorRecord -m $msg -cat InvalidData -o $BrokeredMessage;
Log-Error $fn -msg $msg;
$PSCmdlet.ThrowTerminatingError($e);
}
# Process Message
if ( $BrokeredMessage -ne $null )
{
if ( $ReceiveAndAbandon -and $Receivemode -ne 'ReceiveAndDelete' )
{
$BrokeredMessage.Abandon();
}
if ( $ReceiveAndComplete -and $Receivemode -ne 'ReceiveAndDelete' )
{
$BrokeredMessage.Complete();
}
if ( $BodyAsProperty )
{
$PropertyName = 'Body';
$PropertyValue = Get-MessageBody -Message $BrokeredMessage;
if ( $BrokeredMessage.Properties.ContainsKey($PropertyName) -and $BrokeredMessage.Properties[$PropertyName].toString() -ne $PropertyValue.toString() )
{
[int] $PropertyCount = 1;
$PropertyName = ("Body{0}" -f $PropertyCount);
while( $BrokeredMessage.Properties.ContainsKey($PropertyName) )
{
$PropertyCount += 1;
$PropertyName = ("Body{0}" -f $PropertyCount);
}
}
$BrokeredMessage.Properties[$PropertyName] = $PropertyValue;
}
}
$OutputParameter = $BrokeredMessage;
$fReturn = $true;
}
catch
{
if($gotoSuccess -eq $_.Exception.Message)
{
$fReturn = $true;
}
else
{
[string] $ErrorText = "catch [$($_.FullyQualifiedErrorId)]";
$ErrorText += (($_ | fl * -Force) | Out-String);
$ErrorText += (($_.Exception | fl * -Force) | Out-String);
$ErrorText += (Get-PSCallStack | Out-String);
if($_.Exception -is [System.Net.WebException])
{
Log-Critical $fn "Login to Uri '$Uri' with Username '$Username' FAILED [$_].";
Log-Debug $fn $ErrorText -fac 3;
}
else
{
Log-Error $fn $ErrorText -fac 3;
if($gotoError -eq $_.Exception.Message)
{
Log-Error $fn $e.Exception.Message;
$PSCmdlet.ThrowTerminatingError($e);
}
elseif($gotoFailure -ne $_.Exception.Message)
{
Write-Verbose ("$fn`n$ErrorText");
}
else
{
# N/A
}
}
$fReturn = $false;
$OutputParameter = $null;
}
}
finally
{
# Clean up
# N/A
}
return $OutputParameter;
}
# PROCESS
END
{
$datEnd = [datetime]::Now;
Log-Debug -fn $fn -msg ("RET. fReturn: [{0}]. Execution time: [{1}]ms. Started: [{2}]." -f $fReturn, ($datEnd - $datBegin).TotalMilliseconds, $datBegin.ToString('yyyy-MM-dd HH:mm:ss.fffzzz')) -fac 2;
}
# END
} # function
我为此编写了一个 .NET 包装器,它公开了可以从 PowerShell 本机调用的 PowerShell cmdlet。可以找到存储库 在 https://github.com/sumanthvrao/AzurePowerShellCmdlets.
导入 dll 后,您可以使用 Get-ServiceBusMessage
cmdlet 连接并从服务总线主题获取消息,例如
$message = Get-ServiceBusMessage -connectionString $env:connString -topicName "test" -subscriptionName "abc"