We are discussing here how to set up a retry mechanism. To do so, we need to move messages from a Service Bus subscription to another topic with a time frame as the input parameter. We will see how this is done using a PowerShell script.
A stream retrieves messages from a Service Bus subscription using Peek-Lock mode. This stream then generates an error because a system is temporarily unavailable. With each attempt, the message’s DeliveryCount property is incremented until it hits the maximum delivery count value. The message is automatically moved to the dead letter queue. A process then moves this message to a special topic, waiting for the system to once again become operational so the message can be inserted correctly. In reality, this will not affect just one, but all the messages that caused an error while the system was unavailable.
At the start of the script, we first need to enter all the following parameters:
$ConnectionString = 'your-connection-string' #ServiceBus Namespace connection string $ReceiveTopic = 'your-source-topic' #The topic in which you wish to take messages $ReceiveSubscription = 'your-source-subscription' #The subscription in which you wish to take messages $SendTopic = "your-target-topic" #The topic in which you wish to send the messages $MinDateTime = [datetime]::ParseExact('07-05-2021 15:00:00 +2', 'dd-MM-yyyy HH:mm:ss z', $null) #Start time $MaxDateTime = [datetime]::ParseExact('07-05-2021 16:00:00 +2', 'dd-MM-yyyy HH:mm:ss z', $null) #End time
We have to have an SAS token to interact with Service Bus entities:
$TokenValidFor = 3600 $Pattern = 'Endpoint=(.+);SharedAccessKeyName=(.+);SharedAccessKey=(.+)' ([uri]$Endpoint), $PolicyName, $Key = ($ConnectionString -replace $Pattern, '$1;$2;$3') -split ';' $UrlEncodedEndpoint = [System.Web.HttpUtility]::UrlEncode($Endpoint) $Expiry = [DateTimeOffset]::Now.ToUnixTimeSeconds() + $TokenValidFor $RawSignatureString = "$UrlEncodedEndpoint`n$Expiry" $HMAC = New-Object System.Security.Cryptography.HMACSHA256 $HMAC.Key = [Text.Encoding]::ASCII.GetBytes($Key) $HashBytes = $HMAC.ComputeHash([Text.Encoding]::ASCII.GetBytes($RawSignatureString)) $SignatureString = [Convert]::ToBase64String($HashBytes) $UrlEncodedSignatureString = [System.Web.HttpUtility]::UrlEncode($SignatureString) $SASToken = "SharedAccessSignature sig=$UrlEncodedSignatureString&se=$Expiry&skn=$PolicyName&sr=$UrlEncodedEndpoint"
The message is retrieved using Peek-Lock mode:
$Receive = @{ Uri = "https://$($Endpoint.Host)/$ReceiveTopic/subscriptions/$ReceiveSubscription/messages/head" Method = 'Post' Headers = @{ 'Authorization' = $SASToken } } $ReceiveResult = Invoke-WebRequest @Receive
The message is then sent, taking care to add into the header list all the properties we want to keep. In our example, if the EventType is used for routing and we also want to retain a customer ID and a transaction ID:
$Send = @{ Uri = "https://$($Endpoint.Host)/$SendTopic/messages" ContentType = 'application/json' Method = 'Post' Body = $ReceiveResult.Content Headers = @{ 'Authorization' = $SASToken 'EventType' = "$($ReceiveResult.Headers.EventType)" 'CustomerID' = "$($ReceiveResult.Headers.CustomerID)" 'TransactionID' = "$($ReceiveResult.Headers.TransactionID)" } } $SendResult = Invoke-WebRequest @Send
The source message is deleted depending on the response code from the “send” operation:
if ($null -ne $SendResult -and $SendResult.StatusCode -eq '201') { $Delete = @{ Uri = "https://$($Endpoint.Host)/$ReceiveTopic/subscriptions/$ReceiveSubscription/messages/$($($ReceiveResult.Headers.BrokerProperties | ConvertFrom-Json).MessageId)/$($($ReceiveResult.Headers.BrokerProperties | ConvertFrom-Json).LockToken)" Method = 'Delete' Headers = @{ 'Authorization' = $SASToken } } Invoke-WebRequest @Delete }
Lastly, we just need to repeat these operations for every message found within the defined time window. To do so, we apply the following condition:
if ($null -ne $ReceiveResult -and $ReceiveResult.StatusCode -eq '201' -and [datetime]::Parse($($ReceiveResult.Headers.BrokerProperties | ConvertFrom-Json).EnqueuedTimeUtc) -lt $MaxDateTime -and [datetime]::Parse($($ReceiveResult.Headers.BrokerProperties | ConvertFrom-Json).EnqueuedTimeUtc) -gt $MinDateTime)
#For all messages in subscription while ($true) { #Get message from subscription $Receive = @{ Uri = "https://$($Endpoint.Host)/$ReceiveTopic/subscriptions/$ReceiveSubscription/messages/head" Method = 'Post' Headers = @{ 'Authorization' = $SASToken } } $ReceiveResult = Invoke-WebRequest @Receive #Send message to topic if ($null -ne $ReceiveResult -and $ReceiveResult.StatusCode -eq '201' -and [datetime]::Parse($($ReceiveResult.Headers.BrokerProperties | ConvertFrom-Json).EnqueuedTimeUtc) -lt $MaxDateTime -and [datetime]::Parse($($ReceiveResult.Headers.BrokerProperties | ConvertFrom-Json).EnqueuedTimeUtc) -gt $MinDateTime) { $Send = @{ Uri = "https://$($Endpoint.Host)/$SendTopic/messages" ContentType = 'application/json' Method = 'Post' Body = $ReceiveResult.Content Headers = @{ 'Authorization' = $SASToken 'EventType' = "$($ReceiveResult.Headers.EventType)" 'CustomerID' = "$($ReceiveResult.Headers.CustomerID)" 'TransactionID' = "$($ReceiveResult.Headers.TransactionID)" } } $SendResult = Invoke-WebRequest @Send #Complete message after sending operation in success if ($null -ne $SendResult -and $SendResult.StatusCode -eq '201') { $Delete = @{ Uri = "https://$($Endpoint.Host)/$ReceiveTopic/subscriptions/$ReceiveSubscription/messages/$($($ReceiveResult.Headers.BrokerProperties | ConvertFrom-Json).MessageId)/$($($ReceiveResult.Headers.BrokerProperties | ConvertFrom-Json).LockToken)" Method = 'Delete' Headers = @{ 'Authorization' = $SASToken } } Invoke-WebRequest @Delete } } else { break; } }
Our script is ready. For a shared usage, it just needs to be added to a runbook on Azure.