Nous sommes ici dans le cadre de la mise en place d’un mécanisme de retry. Pour cela, nous avons besoin de déplacer des messages depuis une souscription Service Bus vers un autre topic avec en paramètre une fenêtre de temps. Nous allons voir comment faire cela grâce à un script PowerShell.
Un flux récupère des messages dans une souscription Service Bus en mode Peek&Lock. Ce flux tombe ensuite en erreur à cause de l’indisponibilité temporaire d’un système. À chaque essai, la propriété MaxDeliveryCount du message est incrémentée pour finalement atteindre sa valeur maximale. Le message est automatiquement déplacé en dead-letter. Un process déplace ensuite ce message vers un topic dédié, en attendant que le système soit de nouveau opérationnel et que l’on puisse ainsi insérer correctement le message. En réalité, cela ne concerne pas seulement un mais tous les messages qui sont tombés en erreur pendant la durée d’indisponibilité du système.
Au début du script, il nous faut d’abord renseigner tous ces paramètres :
$ConnectionString = 'your-connection-string' #ServiceBus Namespace connection string $ReceiveTopic = 'your-source-topic' #The topic in which you wish to take messages $ReceiveSubscription = 'your-source-subscription' #The subscription in which you wish to take messages $SendTopic = "your-target-topic" #The topic in which you wish to send the messages $MinDateTime = [datetime]::ParseExact('07-05-2021 15:00:00 +2', 'dd-MM-yyyy HH:mm:ss z', $null) #Start time $MaxDateTime = [datetime]::ParseExact('07-05-2021 16:00:00 +2', 'dd-MM-yyyy HH:mm:ss z', $null) #End time
Pour interagir avec les entités Service Bus, il nous faut un token SAS :
$TokenValidFor = 3600 $Pattern = 'Endpoint=(.+);SharedAccessKeyName=(.+);SharedAccessKey=(.+)' ([uri]$Endpoint), $PolicyName, $Key = ($ConnectionString -replace $Pattern, '$1;$2;$3') -split ';' $UrlEncodedEndpoint = [System.Web.HttpUtility]::UrlEncode($Endpoint) $Expiry = [DateTimeOffset]::Now.ToUnixTimeSeconds() + $TokenValidFor $RawSignatureString = "$UrlEncodedEndpoint`n$Expiry" $HMAC = New-Object System.Security.Cryptography.HMACSHA256 $HMAC.Key = [Text.Encoding]::ASCII.GetBytes($Key) $HashBytes = $HMAC.ComputeHash([Text.Encoding]::ASCII.GetBytes($RawSignatureString)) $SignatureString = [Convert]::ToBase64String($HashBytes) $UrlEncodedSignatureString = [System.Web.HttpUtility]::UrlEncode($SignatureString) $SASToken = "SharedAccessSignature sig=$UrlEncodedSignatureString&se=$Expiry&skn=$PolicyName&sr=$UrlEncodedEndpoint"
On récupère le message en mode Peek&Lock :
$Receive = @{ Uri = "https://$($Endpoint.Host)/$ReceiveTopic/subscriptions/$ReceiveSubscription/messages/head" Method = 'Post' Headers = @{ 'Authorization' = $SASToken } } $ReceiveResult = Invoke-WebRequest @Receive
On envoie ensuite le message en prenant soin de rajouter dans la liste de headers toutes les propriétés que nous voulons conserver. Dans notre exemple, l’EventType sert au routage et nous voulons conserver en plus un ID client et un ID transaction :
$Send = @{ Uri = "https://$($Endpoint.Host)/$SendTopic/messages" ContentType = 'application/json' Method = 'Post' Body = $ReceiveResult.Content Headers = @{ 'Authorization' = $SASToken 'EventType' = "$($ReceiveResult.Headers.EventType)" 'CustomerID' = "$($ReceiveResult.Headers.CustomerID)" 'TransactionID' = "$($ReceiveResult.Headers.TransactionID)" } } $SendResult = Invoke-WebRequest @Send
En se basant sur le code de réponse de l’opération d’envoi, on supprime le message source :
if ($null -ne $SendResult -and $SendResult.StatusCode -eq '201') { $Delete = @{ Uri = "https://$($Endpoint.Host)/$ReceiveTopic/subscriptions/$ReceiveSubscription/messages/$($($ReceiveResult.Headers.BrokerProperties | ConvertFrom-Json).MessageId)/$($($ReceiveResult.Headers.BrokerProperties | ConvertFrom-Json).LockToken)" Method = 'Delete' Headers = @{ 'Authorization' = $SASToken } } Invoke-WebRequest @Delete }
Enfin, il ne nous reste plus qu’à répéter ces opérations pour tous les messages appartenant à la plage horaire définie. On se sert pour cela de la condition suivante :
if ($null -ne $ReceiveResult -and $ReceiveResult.StatusCode -eq '201' -and [datetime]::Parse($($ReceiveResult.Headers.BrokerProperties | ConvertFrom-Json).EnqueuedTimeUtc) -lt $MaxDateTime -and [datetime]::Parse($($ReceiveResult.Headers.BrokerProperties | ConvertFrom-Json).EnqueuedTimeUtc) -gt $MinDateTime)
#For all messages in subscription while ($true) { #Get message from subscription $Receive = @{ Uri = "https://$($Endpoint.Host)/$ReceiveTopic/subscriptions/$ReceiveSubscription/messages/head" Method = 'Post' Headers = @{ 'Authorization' = $SASToken } } $ReceiveResult = Invoke-WebRequest @Receive #Send message to topic if ($null -ne $ReceiveResult -and $ReceiveResult.StatusCode -eq '201' -and [datetime]::Parse($($ReceiveResult.Headers.BrokerProperties | ConvertFrom-Json).EnqueuedTimeUtc) -lt $MaxDateTime -and [datetime]::Parse($($ReceiveResult.Headers.BrokerProperties | ConvertFrom-Json).EnqueuedTimeUtc) -gt $MinDateTime) { $Send = @{ Uri = "https://$($Endpoint.Host)/$SendTopic/messages" ContentType = 'application/json' Method = 'Post' Body = $ReceiveResult.Content Headers = @{ 'Authorization' = $SASToken 'EventType' = "$($ReceiveResult.Headers.EventType)" 'CustomerID' = "$($ReceiveResult.Headers.CustomerID)" 'TransactionID' = "$($ReceiveResult.Headers.TransactionID)" } } $SendResult = Invoke-WebRequest @Send #Complete message after sending operation in success if ($null -ne $SendResult -and $SendResult.StatusCode -eq '201') { $Delete = @{ Uri = "https://$($Endpoint.Host)/$ReceiveTopic/subscriptions/$ReceiveSubscription/messages/$($($ReceiveResult.Headers.BrokerProperties | ConvertFrom-Json).MessageId)/$($($ReceiveResult.Headers.BrokerProperties | ConvertFrom-Json).LockToken)" Method = 'Delete' Headers = @{ 'Authorization' = $SASToken } } Invoke-WebRequest @Delete } } else { break; } }
Notre script est prêt. Pour une utilisation partagée, il ne reste plus qu’à l’ajouter à un runbook sur Azure.