Relancer un message depuis une souscription Service Bus avec PowerShell

Tanguy SCHOUBERT
Publié par Tanguy SCHOUBERT
Catégorie : Azure
19/08/2022

Nous sommes ici dans le cadre de la mise en place d’un mécanisme de retry. Pour cela, nous avons besoin de déplacer des messages depuis une souscription Service Bus vers un autre topic avec en paramètre une fenêtre de temps. Nous allons voir comment faire cela grâce à un script PowerShell.

 

Description du besoin

 

Un flux récupère des messages dans une souscription Service Bus en mode Peek&Lock. Ce flux tombe ensuite en erreur à cause de l’indisponibilité temporaire d’un système. À chaque essai, la propriété MaxDeliveryCount du message est incrémentée pour finalement atteindre sa valeur maximale. Le message est automatiquement déplacé en dead-letter. Un process déplace ensuite ce message vers un topic dédié, en attendant que le système soit de nouveau opérationnel et que l’on puisse ainsi insérer correctement le message. En réalité, cela ne concerne pas seulement un mais tous les messages qui sont tombés en erreur pendant la durée d’indisponibilité du système.

 

Paramètres du script

 

Au début du script, il nous faut d’abord renseigner tous ces paramètres :

$ConnectionString = 'your-connection-string' #ServiceBus Namespace connection string
$ReceiveTopic = 'your-source-topic' #The topic in which you wish to take messages
$ReceiveSubscription = 'your-source-subscription' #The subscription in which you wish to take messages
$SendTopic = "your-target-topic" #The topic in which you wish to send the messages
$MinDateTime = [datetime]::ParseExact('07-05-2021 15:00:00 +2', 'dd-MM-yyyy HH:mm:ss z', $null) #Start time
$MaxDateTime = [datetime]::ParseExact('07-05-2021 16:00:00 +2', 'dd-MM-yyyy HH:mm:ss z', $null) #End time

 

Génération du token SAS

 

Pour interagir avec les entités Service Bus, il nous faut un token SAS :

$TokenValidFor = 3600
$Pattern = 'Endpoint=(.+);SharedAccessKeyName=(.+);SharedAccessKey=(.+)'
([uri]$Endpoint), $PolicyName, $Key = ($ConnectionString -replace $Pattern, '$1;$2;$3') -split ';'
$UrlEncodedEndpoint = [System.Web.HttpUtility]::UrlEncode($Endpoint)
$Expiry = [DateTimeOffset]::Now.ToUnixTimeSeconds() + $TokenValidFor
$RawSignatureString = "$UrlEncodedEndpoint`n$Expiry"
$HMAC = New-Object System.Security.Cryptography.HMACSHA256
$HMAC.Key = [Text.Encoding]::ASCII.GetBytes($Key)
$HashBytes = $HMAC.ComputeHash([Text.Encoding]::ASCII.GetBytes($RawSignatureString))
$SignatureString = [Convert]::ToBase64String($HashBytes)
$UrlEncodedSignatureString = [System.Web.HttpUtility]::UrlEncode($SignatureString)
$SASToken = "SharedAccessSignature sig=$UrlEncodedSignatureString&se=$Expiry&skn=$PolicyName&sr=$UrlEncodedEndpoint"

 

Déplacement des messages

 

Récupération d’un message

On récupère le message en mode Peek&Lock :

$Receive = @{
    Uri     = "https://$($Endpoint.Host)/$ReceiveTopic/subscriptions/$ReceiveSubscription/messages/head"
    Method  = 'Post'
    Headers = @{
        'Authorization' = $SASToken
    }
}
$ReceiveResult = Invoke-WebRequest @Receive

 

Envoi du message

On envoie ensuite le message en prenant soin de rajouter dans la liste de headers toutes les propriétés que nous voulons conserver. Dans notre exemple, l’EventType sert au routage et nous voulons conserver en plus un ID client et un ID transaction :

$Send = @{
    Uri         = "https://$($Endpoint.Host)/$SendTopic/messages"
    ContentType = 'application/json'
    Method      = 'Post'
    Body        = $ReceiveResult.Content
    Headers     = @{
        'Authorization'         = $SASToken
        'EventType'             = "$($ReceiveResult.Headers.EventType)"
        'CustomerID'            = "$($ReceiveResult.Headers.CustomerID)"
        'TransactionID'         = "$($ReceiveResult.Headers.TransactionID)"
    }
}
$SendResult = Invoke-WebRequest @Send

 

Suppression du message source si l’envoi est un succès

En se basant sur le code de réponse de l’opération d’envoi, on supprime le message source :

if ($null -ne $SendResult -and $SendResult.StatusCode -eq '201') {
    $Delete = @{
        Uri     = "https://$($Endpoint.Host)/$ReceiveTopic/subscriptions/$ReceiveSubscription/messages/$($($ReceiveResult.Headers.BrokerProperties | ConvertFrom-Json).MessageId)/$($($ReceiveResult.Headers.BrokerProperties | ConvertFrom-Json).LockToken)"
        Method  = 'Delete'
        Headers = @{
            'Authorization' = $SASToken
        }
    }
    Invoke-WebRequest @Delete
}

 

Récupération d’un message

Enfin, il ne nous reste plus qu’à répéter ces opérations pour tous les messages appartenant à la plage horaire définie. On se sert pour cela de la condition suivante :

if ($null -ne $ReceiveResult -and $ReceiveResult.StatusCode -eq '201' -and [datetime]::Parse($($ReceiveResult.Headers.BrokerProperties | ConvertFrom-Json).EnqueuedTimeUtc) -lt $MaxDateTime -and [datetime]::Parse($($ReceiveResult.Headers.BrokerProperties | ConvertFrom-Json).EnqueuedTimeUtc) -gt $MinDateTime)

 

Script final pour le déplacement de messages

#For all messages in subscription
while ($true) {
    #Get message from subscription
    $Receive = @{
        Uri     = "https://$($Endpoint.Host)/$ReceiveTopic/subscriptions/$ReceiveSubscription/messages/head"
        Method  = 'Post'
        Headers = @{
            'Authorization' = $SASToken
        }
    }
    $ReceiveResult = Invoke-WebRequest @Receive

    #Send message to topic
    if ($null -ne $ReceiveResult -and $ReceiveResult.StatusCode -eq '201' -and [datetime]::Parse($($ReceiveResult.Headers.BrokerProperties | ConvertFrom-Json).EnqueuedTimeUtc) -lt $MaxDateTime -and [datetime]::Parse($($ReceiveResult.Headers.BrokerProperties | ConvertFrom-Json).EnqueuedTimeUtc) -gt $MinDateTime) {
        $Send = @{
            Uri         = "https://$($Endpoint.Host)/$SendTopic/messages"
            ContentType = 'application/json'
            Method      = 'Post'
            Body        = $ReceiveResult.Content
            Headers     = @{
                'Authorization'         = $SASToken
                'EventType'             = "$($ReceiveResult.Headers.EventType)"
                'CustomerID'            = "$($ReceiveResult.Headers.CustomerID)"
                'TransactionID'         = "$($ReceiveResult.Headers.TransactionID)"
            }
        }
        $SendResult = Invoke-WebRequest @Send
    
        #Complete message after sending operation in success
        if ($null -ne $SendResult -and $SendResult.StatusCode -eq '201') {
            $Delete = @{
                Uri     = "https://$($Endpoint.Host)/$ReceiveTopic/subscriptions/$ReceiveSubscription/messages/$($($ReceiveResult.Headers.BrokerProperties | ConvertFrom-Json).MessageId)/$($($ReceiveResult.Headers.BrokerProperties | ConvertFrom-Json).LockToken)"
                Method  = 'Delete'
                Headers = @{
                    'Authorization' = $SASToken
                }
            }
            Invoke-WebRequest @Delete
        }
    }
    else {
        break;
    }
}

 

Notre script est prêt. Pour une utilisation partagée, il ne reste plus qu’à l’ajouter à un runbook sur Azure.