Biztalk : Pipeline Stamping Receive Message


This code allows you to stamp your incoming message with a XML element called SequenceNo. This sequence number gives you the ability to sequence without making changes to your existing application.  It is unique in the sense that it does not create a new part for you and there shouldn't be too much modification to your existing codes in order to implement it. 


Creating a pipeline in Biztalk Server is relatively easy. It requires you to

1) Implement specific Interface

2) Create a Biztalk Project that uses a pipeline Item type with extension Btp. Drag the assembly that you have created above into [validate] section.

3) Deploy and move it to the right application. You need to configure properly after this.


So first you have to come up with an assembly that overrides the Execute method as shown below. It returns a new IBaseMessage: 

 public IBaseMessage Execute(IPipelineContext pContext, IBaseMessage pInMsg)
        {
            int seq = 0;

            /// new message structure ///  
            IBaseMessageFactory msgFactory = pContext.GetMessageFactory(); 
            IBaseMessageContext context = pInMsg.Context;

            IBaseMessagePart OriginalBodyPart = pInMsg.BodyPart;
            Stream MessageStream = OriginalBodyPart.GetOriginalDataStream();
            
            //// Constructing a new message ////

            IBaseMessage msg = msgFactory.CreateMessage();


            
            IPipelineContextEx pcx = pContext as IPipelineContextEx;

            if (OriginalBodyPart != null)
            {
                
                
              #region [Debugging/Logging]
       //XmlDocument xDocOutput = new XmlDocument();
                //xDocOutput.Load(MessageStream);
                //XmlTextWriter writer = new XmlTextWriter("d:\\temp\\message.xml", System.Text.Encoding.UTF8);
                //writer.Formatting = Formatting.Indented;
                //xDocOutput.WriteTo(writer);
                //writer.Flush();
                //writer.Close(); 
       #endregion  
                
                if (pcx != null)
                {
                    ITransaction trans = null;
                    trans = (ITransaction)pcx.GetTransaction();

                    if (trans != null)
                    {
                        //// Get ConnectionString //// 

                        SqlConnection conn = new SqlConnection("Database=SequenceDB;Server=72.15.219.241;connect timeout=60;user id=es14703-dev;password=p0w3rOFdr3ams");
                        SqlCommand cmd = new SqlCommand();
                        cmd.CommandType = CommandType.StoredProcedure;
                        cmd.CommandText = "GetSequence";
                        
                        SqlParameter parameter = new SqlParameter("@ReceiveLocationName", SqlDbType.NVarChar, 50);
                        parameter.Value = "TestReceiveLocation"; 
                        cmd.Parameters.Add(parameter);


                        ///////////////////////////////////////////
                        /// System.IO.File.AppendAllText("d:\\temp\\mqdata.log", MessageStream.ToString());
                        /////////////////////////////////////////////

                        try
                        {
                            cmd.Connection = conn;
                            conn.Open(); 

                            //// Get the Data /// 
                            seq = Convert.ToInt32(cmd.ExecuteScalar());


                            IBaseMessagePart part = msgFactory.CreateMessagePart();

                            XmlDocument xdoc = new XmlDocument();
                            xdoc.Load(MessageStream);

                            XmlElement newElement = xdoc.CreateElement("SequenceNo");
                            newElement.InnerText = seq.ToString();
                            xdoc.LastChild.AppendChild(newElement);

                            byte[] dataBytes = System.Text.Encoding.UTF8.GetBytes(xdoc.InnerXml);
                            MemoryStream stream = new MemoryStream();
                            stream.Write(dataBytes, 0, dataBytes.Length);

                            stream.Seek(0, SeekOrigin.Begin);

                            part.Data = stream;
                            msg.Context = context;

                            msg.AddPart("DMQMessageContractSend", part, true);

                            
                            IBaseMessagePart ExtendedPart = msgFactory.CreateMessagePart();
                            byte[] extendedContent = System.Text.Encoding.UTF8.GetBytes(string.Format("{0}", seq.ToString()));
                            
                            ExtendedPart.Data = new MemoryStream(extendedContent);
                            ExtendedPart.Charset = "utf-8";
                            ExtendedPart.ContentType = "text/xml";
                            msg.AddPart("Sequence", ExtendedPart, false);


                            #region [OldCodes]

                            ///  Stamp a sequence number to the message /// 

                            //XmlDocument xdoc = new XmlDocument();
                            //xdoc.Load(MessageStream);

                            //XmlElement newElement = xdoc.CreateElement("SequenceNo");
                            //newElement.InnerText = seq.ToString();
                            //xdoc.LastChild.AppendChild(newElement);

                            //byte[] dataBytes = System.Text.Encoding.UTF8.GetBytes(xdoc.InnerXml);
                            //MemoryStream stream = new MemoryStream();

                            //stream.Write(dataBytes, 0, dataBytes.Length);

                            //System.IO.File.AppendAllText("d:\\temp\\msmqlog.log", "somedata gotta be here" +  System.Text.Encoding.UTF8.GetString(dataBytes));

                            //stream.Seek(0, SeekOrigin.Begin);
                            //bodyPart.Data.Seek(0, SeekOrigin.Begin);
                            //OriginalBodyPart.Data = stream;

                            //IBaseMessage outMsg;
                            //outMsg = pContext.GetMessageFactory().CreateMessage();
                            //outMsg.AddPart("Body", pContext.GetMessageFactory().CreateMessagePart(), true);
                            //outMsg.BodyPart.Data = stream;

                            //IBaseMessage outMsg;
                            //pInMsg = pContext.GetMessageFactory().CreateMessage();
                            //pInMsg.AddPart("Sequence", pContext.GetMessageFactory().CreateMessagePart(), true);
                            //pInMsg.BodyPart.Data = stream;

                            //bodyPart.Data.Write(dataBytes, 0, dataBytes.Length);

                            //byte[] outputDataBytes = new byte[bodyPart.Data.Length]; 
                            //int xlength  = System.Convert.ToInt32(bodyPart.Data.Length);
                            //bodyPart.Data.Read(outputDataBytes, 0, xlength);

                            //System.IO.File.AppendAllText("d:\\temp\\scanstream.log", "Output " + System.Text.Encoding.UTF8.GetString(outputDataBytes));

                            //// Increment and Update the data //// 

                            // context.Promote("messageData", "", seq.ToString());
                            // context.Write("messageData", "", seq.ToString());


                            #endregion                            

                            
                            SqlConnection SeqUpdateConn = new SqlConnection("Database=SequenceDB;Server=72.15.219.241;connect timeout=60;user id=es14703-dev;password=p0w3rOFdr3ams");

                            SqlCommand SeqUpdateCommand = new SqlCommand();
                            SeqUpdateCommand.CommandType = CommandType.StoredProcedure;
                            SeqUpdateCommand.CommandText = "UpdateSequence";

                            SqlParameter UpdateParameter = new SqlParameter("@ReceiveLocationName", SqlDbType.NVarChar, 50);
                            UpdateParameter.Value = "TestReceiveLocation";
                            SeqUpdateCommand.Parameters.Add(UpdateParameter);

                            SeqUpdateCommand.Connection = SeqUpdateConn;
                            SeqUpdateConn.Open();

                            SeqUpdateCommand.ExecuteNonQuery(); 


                            SeqUpdateCommand.Dispose();
                            SeqUpdateConn.Close();

                        }
                        catch (SqlException)
                        {
                            throw;
                        }
                        finally
                        {
                            cmd.Dispose();
                            conn.Close();
                        }

                    }
                }

            }


            /// Returning the newly constructed messeage part /// 

            return msg;


        }

Once you have this code, next is to create a new biztalk server project and use toolbox to Choose Item which can be added into your pipeline. 

In your orchestration,  reference can be made to this Sequence Number using the following codes:-

varStrSequenceNo = xpath(DMQMessage, "string(/*[local-name()='DMQMessageContractSend' and namespace-uri()='']/*[local-name()='SequenceNo' and namespace-uri()=''])");








Comments

Popular posts from this blog

The specified initialization vector (IV) does not match the block size for this algorithm