In this article, we suggest using a send pipeline in an orchestration so as to aggregate messages. The advantages of this method are:
- proper control over the structure of aggregated messages;
- full control over the conditions for aggregating individual messages, with a loop within the orchestration;
- the possibility to run more processing on the individual messages being aggregated;
We have put another aggregation method in place, one using Windows Task Scheduler. For more details on this particular method, please see the article (article incoming).
Description of a simple case
Our objective here is to aggregate all the invoices received over a certain period of time or aggregate a certain number of invoices into one “envelope” invoice.
Let’s assume the invoices are structured as follows:
To aggregate them, we need a “container” schema to store each individual message that is to be aggregated
where the Body node is of AnyType and is Unbounded.
We are going to create a send pipeline by selecting only the XML assembler component as described below:
The component’s properties are:
We are using:
- Add processing instructions: Append, to be able to aggregate messages;
- Document schemas: choose the individual schema – here, it is the Invoice schema;
- Envelope schemas: choose the schema that will be used to store individual messages – here, it is the ArchiveInvoice schema;
The next step is to create an orchestration by calling the send pipeline to perform the aggregation by referencing the following two assemblies: Microsoft.BizTalk.Pipeline.dll and Microsoft.XLANGs.Pipeline.dll. This is the orchestration:
In our example, the orchestration aggregates all the individual invoices that pass through the system for 30 seconds, or it stops once 5 invoices have been received.
We will now look in detail at how the orchestration performs aggregation.
Let’s create a first receive port called ReceiveInvoice then initialize a BTS.ReceivePortName correlation named CorRcvPort. We will have another receive port to receive other invoices.
We then create the vmsgToAggregatevariable, of type Microsoft.XLANGs.Pipeline.SendPipelineInputMessages, in the Expression shape AggregateInvoice, the code for which is:
Where msgInvoiceis what we receive in the first ‘receive’ that occurs.
We then enter the loop within which we receive the other invoices, the exit condition being the following:
ExistNextMsg && vNbrInvoices < 5
Where ExistNextMsg is a Boolean variable initially set to True, and vNbrInvoices is a System.Int32 variable initially set to 1 because we have already received one invoice (which triggered the orchestration).
In the Listenshape, we use the first branch to continue receiving invoices. The second branch is a time counter where we wait 30 seconds and we set the Boolean ExistNextMsgto False to exit the loop.
For the second receive port, ReceiveNextInvoice, we set Following Correlation Sets to CorRcvPort, as created earlier, which serves to trigger the same orchestration instance for the other messages.
In the AggregateNextInvoice Expression shape, we repeat the same operation for aggregation, i.e.
At the end of the first branch, we increment our vNbrInvoices counter.
Next, we enter the ExecuteSendPipelinescope. This is where we call the send pipeline created earlier to carry out the aggregation. We create a msgAggregatemessage, of the System.Xml.XmlDocument type, initialized as follows:
msgAggregate = new System.Xml.XmlDocument();
The Message Assignment shape ExecuteSendPipeline contains the following code:
The ExecuteSendPipeline method in the XLANGPipelineManagerclass expects the following input parameters:
- System.Type(SendPipelineType): here, this will be our SendPipeline AddInvoicesSendPipeline;
- XLANGs.Pipeline.SendPipelineInputMessage(inMessage): here, the ingoing message vmsgToAggregate;
- XLANGS.BaseTypes.XLANGMessage(outXLANGMsg): here, the outgoing message (that we build in the Message Construction shape) msgAggregatewhich is a System.Xml.XmlDocument.
At the end, we assign the message to msgArchiveInvoicesand we send it to the send port.
Once it has all been deployed in BizTalk, we create physical receive port and send port, and we link them to the orchestration’s logical ports.
We can now test our process with the following sample invoice:
We are going to send a number of messages (three invoices, in this case) 30 seconds from now. The message below is the output after aggregation:
As you can see, we have obtained an aggregate message of several individual messages.
To find out more
You have seen that the output message structure we obtained is not very satisfactory, as there is a Body tag used as a container for the individual messages. With an intermediate step, we could find a way to remove this level of tag. Simply use 2 maps as below (or a single map if produced using full XSLT).
We can model an intermediate schema, an InvoicesBodyschema which is an envelope schema, and we can import the individual Invoice schema in a Body tag.
We are going to build one invoice that includes all the invoices. The schema is shown below:
In the Invoices.xsd schema, we import the Invoice schema and we set the node as “unbounded”.
We first create the following map:
We do a “Mass copy” to give the msgArchiveInvoicesmessage a type, because the Body tag in the message has AnyType. The result of this map is shown below:
We then make the following map:
This time, we skipped the “Body” tag and obtained a clean envelope message:
Just add these two transformations at the end of the orchestration to get the following final result:
When more than one message is processed in one orchestration instance, the following error might be displayed:
0xC0C01B4C The instance completed without consuming all of its messages. The instance and its unconsumed messages have been suspended.().
This is a “zombie” message. A future blog article will discuss how to handle this type of message.
More information about zombie messages: Zombies in BizTalk Server.