Rerunning a message from a Service Bus subscription with PowerShell

Tanguy SCHOUBERT
Published by Tanguy SCHOUBERT
Category : Azure
05/08/2022

We are discussing here how to set up a retry mechanism. To do so, we need to move messages from a Service Bus subscription to another topic with a time frame as the input parameter. We will see how this is done using a PowerShell script.

 

Description of the requirement

 

A stream retrieves messages from a Service Bus subscription using Peek-Lock mode. This stream then generates an error because a system is temporarily unavailable. With each attempt, the message’s DeliveryCount property is incremented until it hits the maximum delivery count value. The message is automatically moved to the dead letter queue. A process then moves this message to a special topic, waiting for the system to once again become operational so the message can be inserted correctly. In reality, this will not affect just one, but all the messages that caused an error while the system was unavailable.

 

Script parameters

 

At the start of the script, we first need to enter all the following parameters:

 

$ConnectionString = 'your-connection-string' #ServiceBus Namespace connection string
$ReceiveTopic = 'your-source-topic' #The topic in which you wish to take messages
$ReceiveSubscription = 'your-source-subscription' #The subscription in which you wish to take messages
$SendTopic = "your-target-topic" #The topic in which you wish to send the messages
$MinDateTime = [datetime]::ParseExact('07-05-2021 15:00:00 +2', 'dd-MM-yyyy HH:mm:ss z', $null) #Start time
$MaxDateTime = [datetime]::ParseExact('07-05-2021 16:00:00 +2', 'dd-MM-yyyy HH:mm:ss z', $null) #End time

 

Generating the SAS token

 

We have to have an SAS token to interact with Service Bus entities:

 

$TokenValidFor = 3600
$Pattern = 'Endpoint=(.+);SharedAccessKeyName=(.+);SharedAccessKey=(.+)'
([uri]$Endpoint), $PolicyName, $Key = ($ConnectionString -replace $Pattern, '$1;$2;$3') -split ';'
$UrlEncodedEndpoint = [System.Web.HttpUtility]::UrlEncode($Endpoint)
$Expiry = [DateTimeOffset]::Now.ToUnixTimeSeconds() + $TokenValidFor
$RawSignatureString = "$UrlEncodedEndpoint`n$Expiry"
$HMAC = New-Object System.Security.Cryptography.HMACSHA256
$HMAC.Key = [Text.Encoding]::ASCII.GetBytes($Key)
$HashBytes = $HMAC.ComputeHash([Text.Encoding]::ASCII.GetBytes($RawSignatureString))
$SignatureString = [Convert]::ToBase64String($HashBytes)
$UrlEncodedSignatureString = [System.Web.HttpUtility]::UrlEncode($SignatureString)
$SASToken = "SharedAccessSignature sig=$UrlEncodedSignatureString&se=$Expiry&skn=$PolicyName&sr=$UrlEncodedEndpoint"

 

Moving messages

 

Retrieving a message

The message is retrieved using Peek-Lock mode:

 

$Receive = @{
    Uri     = "https://$($Endpoint.Host)/$ReceiveTopic/subscriptions/$ReceiveSubscription/messages/head"
    Method  = 'Post'
    Headers = @{
        'Authorization' = $SASToken
    }
}
$ReceiveResult = Invoke-WebRequest @Receive

 

Message sent

The message is then sent, taking care to add into the header list all the properties we want to keep. In our example, if the EventType is used for routing and we also want to retain a customer ID and a transaction ID:

 

$Send = @{
    Uri         = "https://$($Endpoint.Host)/$SendTopic/messages"
    ContentType = 'application/json'
    Method      = 'Post'
    Body        = $ReceiveResult.Content
    Headers     = @{
        'Authorization'         = $SASToken
        'EventType'             = "$($ReceiveResult.Headers.EventType)"
        'CustomerID'            = "$($ReceiveResult.Headers.CustomerID)"
        'TransactionID'         = "$($ReceiveResult.Headers.TransactionID)"
    }
}
$SendResult = Invoke-WebRequest @Send

 

Deleting the source message if sending is successful

The source message is deleted depending on the response code from the “send” operation:

 

if ($null -ne $SendResult -and $SendResult.StatusCode -eq '201') {
    $Delete = @{
        Uri     = "https://$($Endpoint.Host)/$ReceiveTopic/subscriptions/$ReceiveSubscription/messages/$($($ReceiveResult.Headers.BrokerProperties | ConvertFrom-Json).MessageId)/$($($ReceiveResult.Headers.BrokerProperties | ConvertFrom-Json).LockToken)"
        Method  = 'Delete'
        Headers = @{
            'Authorization' = $SASToken
        }
    }
    Invoke-WebRequest @Delete
}

 

Retrieving a message

Lastly, we just need to repeat these operations for every message found within the defined time window. To do so, we apply the following condition:

 

if ($null -ne $ReceiveResult -and $ReceiveResult.StatusCode -eq '201' -and [datetime]::Parse($($ReceiveResult.Headers.BrokerProperties | ConvertFrom-Json).EnqueuedTimeUtc) -lt $MaxDateTime -and [datetime]::Parse($($ReceiveResult.Headers.BrokerProperties | ConvertFrom-Json).EnqueuedTimeUtc) -gt $MinDateTime)

 

Final script to move messages

#For all messages in subscription
while ($true) {
    #Get message from subscription
    $Receive = @{
        Uri     = "https://$($Endpoint.Host)/$ReceiveTopic/subscriptions/$ReceiveSubscription/messages/head"
        Method  = 'Post'
        Headers = @{
            'Authorization' = $SASToken
        }
    }
    $ReceiveResult = Invoke-WebRequest @Receive

    #Send message to topic
    if ($null -ne $ReceiveResult -and $ReceiveResult.StatusCode -eq '201' -and [datetime]::Parse($($ReceiveResult.Headers.BrokerProperties | ConvertFrom-Json).EnqueuedTimeUtc) -lt $MaxDateTime -and [datetime]::Parse($($ReceiveResult.Headers.BrokerProperties | ConvertFrom-Json).EnqueuedTimeUtc) -gt $MinDateTime) {
        $Send = @{
            Uri         = "https://$($Endpoint.Host)/$SendTopic/messages"
            ContentType = 'application/json'
            Method      = 'Post'
            Body        = $ReceiveResult.Content
            Headers     = @{
                'Authorization'         = $SASToken
                'EventType'             = "$($ReceiveResult.Headers.EventType)"
                'CustomerID'            = "$($ReceiveResult.Headers.CustomerID)"
                'TransactionID'         = "$($ReceiveResult.Headers.TransactionID)"
            }
        }
        $SendResult = Invoke-WebRequest @Send
    
        #Complete message after sending operation in success
        if ($null -ne $SendResult -and $SendResult.StatusCode -eq '201') {
            $Delete = @{
                Uri     = "https://$($Endpoint.Host)/$ReceiveTopic/subscriptions/$ReceiveSubscription/messages/$($($ReceiveResult.Headers.BrokerProperties | ConvertFrom-Json).MessageId)/$($($ReceiveResult.Headers.BrokerProperties | ConvertFrom-Json).LockToken)"
                Method  = 'Delete'
                Headers = @{
                    'Authorization' = $SASToken
                }
            }
            Invoke-WebRequest @Delete
        }
    }
    else {
        break;
    }
}

 

Our script is ready. For a shared usage, it just needs to be added to a runbook on Azure.