Agréger des messages dans une orchestration avec un pipeline d’envoi

Hao Wu
Publié par Hao
Catégorie : BizTalk
20/10/2019

Contexte

Dans cet article, nous proposons de faire appel au pipeline d’envoi dans une orchestration afin d’être en mesure d’agréger les messages. Voici les avantages de cette méthode :

  • La réelle maîtrise de la structure des messages agrégés ;
  • Le meilleur contrôle des conditions pour agréger les messages unitaires, avec une boucle dans l’orchestration ;
  • La possibilité d’effectuer plus de traitements sur les messages unitaires à agréger ;
  • Etc.

Nous avons mis en place une autre méthode d’agrégation qui utilise notamment le Task Scheduler de Windows. Pour plus ce détails sur ce mode de fonctionnement, dirigez-vous vers l’article suivant (article à venir).

Description d’un cas simple

Notre objectif est d’agréger toutes les factures reçues dans un certain laps de temps ou d’agréger un certain nombre de factures dans une facture enveloppe.

Supposons que les factures soient modélisées avec le schéma suivant :

Schéma Invoice

Pour faire l’agrégation, nous avons besoin d’un schéma « conteneur » pour stocker chaque message unitaire à agréger.

Archive Invoice

Dont le node « Body » est de type « AnyType » et « Unbounded ».

Nous allons créer un pipeline d’envoi en sélectionnant seulement le composant XML assembler tel que ci-dessous :

Pipeline

Voici les propriétés du composant :

Pipeline properties

Nous utilisons :

  • Add processing instructions : « Append », pour pouvoir agréger les messages ;
  • Document schemas : choisir le schéma unitaire : ici le schéma Invoice,
  • Enveloppe schemas : choisir le schéma qui servira à stocker les messages unitaires : ici le schéma ArchiveInvoice.

Contenu de l’orchestration

Par la suite, nous allons créer une orchestration en appelant le pipeline d’envoi pour faire l’agrégation en référençant les 2 assemblies suivantes : Microsoft.BizTalk.Pipeline.dll et Microsoft.XLANGs.Pipeline.dll. Voici l’orchestration :

Orchestration 1Orchestration 2

 

Dans notre exemple, l’orchestration agrège toutes les factures unitaires qui passent dans le système pendant 30 secondes ou elle s’arrête dès la réception de 5 factures.

Nous allons voir en détail comment l’orchestration fait l’agrégation.

Créons un premier port de réception « ReceiveInvoice », puis initialisons une corrélation nommé CorRcvPort qui est de type BTS.ReceivePortName car nous allons avoir un autre port de réception pour la réception des autres factures.

Créons ensuite une variable «vmsgToAggregate»  de type Microsoft.XLANGs.Pipeline.SendPipelineInputMessages, dans la shape Expression AggregateInvoice, dont voici le code :

vmsgToAggregate.Add(msgInvoice);

Où msgInvoice est ce que nous recevons dans la première réception.

Nous passons ensuite dans la boucle dans laquelle nous recevons les autres factures, la condition de sortie étant :

ExistNextMsg && vNbrInvoices < 5

Où ExistNextMsg est une variable de type boolean et initialisée à True, vNbrInvoices est une variable de type System.Int32 et initialisée à 1 car nous avons déjà reçu une facture (qui a déclenché l’orchestration).

Dans la shape Listen, nous utilisons la première branche pour continuer à recevoir d’autres factures. La deuxième branche est un compteur de temps où nous attendons 30 secondes et nous mettons le booléen ExistNextMsg à False pour sortir de la boucle.

Pour le deuxième port de réception « ReceiveNextInvoice », mettons Following Correlation Sets à CorRcvPort que nous avons créé plus tôt, ce qui permet de déclencher la même instance d’orchestration pour les autres messages.

Dans la Shape Expression AggregateNextInvoice, nous faisons la même opération pour l’agrégation, c’est-à-dire :

vmsgToAggregate.Add(msgInvoice);

A la fin de la première branche, nous incrémentons notre compteur vNbrInvoices.

Par la suite, nous entrons dans le scope ExecuteSendPipeline, c’est là où nous allons appeler le pipeline d’envoi créé précédemment pour faire l’agrégation. Créons un message msgAggregate de type System.Xml.XmlDocument, avec l’initialisation suivante :

msgAggregate = new System.Xml.XmlDocument();
msgAggregate.LoadXml("<temp>Temp</temp>");

Dans la shape MessageAssignment ExecuteSendPipeline, nous avons le code suivant :

Microsoft.XLANGs.Pipeline.XLANGPipelineManager.ExecuteSendPipeline(typeof(ArchiveInvoices.Pipeline.AddInvoicesSendPipeline),vmsgToAggregate,msgAggregate);

La méthode ExecuteSendPipeline de la classe XLANGPipelineManager attend les paramètres suivants :

  • System.Type(SendPipelineType) : ici ce sera notre SendPipeline AddInvoicesSendPipeline ;
  • XLANGs.Pipeline.SendPipelineInputMessage(inMessage) : ici le message entrant vmsgToAggregate ;
  • XLANGS.BaseTypes.XLANGMessage(outXLANGMsg) : ici le message sortant (que nous construisons dans la shape Message Construction) msgAggregate qui est de type System.Xml.XmlDocument.

A la fin, nous affectons le message au msgArchiveInvoices, et l’envoyons au port d’envoi.

Une fois le tout déployé dans Biztalk, nous créons un port de réception et un port d’envoi physique et nous les lions avec les ports logiques de l’orchestration.

Testons désormais notre processus avec l’exemple de facture suivant :

Exemple Facture

Nous allons envoyer plusieurs messages (ici 3 factures) d’ici 30 secondes, Voici le message sortant après l’agrégation :

Exemple Factures

Comme vous pouvez le constater, nous avons bien obtenu un message agrégé de plusieurs messages unitaires.

Pour aller plus loin

Réflexion 1 :

Vous avez constaté que la structure du message sortant que nous avons obtenu n’est pas très satisfaisante car il y a une balise « Body » qui a servi de conteneur pour les messages unitaires. Avec une étape intermédiaire, nous pouvons nous en sortir pour enlever ce niveau de balise. Il suffit de passer par 2 maps comme ci-dessous (ou une seule map si réalisée en full XSLT).

Modélisons un schéma intermédiaire : Un schéma InvoicesBody qui est un schéma enveloppe et importons le schéma unitaire Invoice dans une balise Body.

Schéma Invoice

Nous allons construire une facture qui inclut toutes les factures. Voici le schéma :

Schéma Invoices

Dans le schéma Invoices.xsd, nous importons le schéma Invoice et mettons le nœud en « unbounded ».

Créons d’abord la map suivante :

Mapping Archive Invoice vers Invoices

Nous faisons un « Mass copy » pour typer le message msgArchiveInvoices, car la balise Body dans ce dernier est de type AnyType. Voici le résulat de cette map :

Résultat mapping

Faisons ensuite la map suivante :

Mapping Invoices Body vers Invoices

Cette fois-ci, nous avons passé la balise « Body » et obtenu un message enveloppe propre :

Résultat mapping

Il suffit d’ajouter ces deux transformations à la fin de l’orchestration pour le résultat final :

Orchestration 3

Réflexion 2 :

Lorsque plusieurs messages sont traités dans une instance d’orchestration, l’erreur suivante peut apparaître :

0xC0C01B4C The instance completed without consuming all of its messages. The instance and its unconsumed messages have been suspended.().

Nous sommes ici en présence de messages « Zombies ». La gestion de ce type de messages fera l’objet l’un prochain article de blog.

Plus d’informations sur les messages Zombies : Zombie dans BizTalk Server.