上篇博文中我們介紹了Azure Messaging的重復(fù)消息機(jī)制、At most once 和At least once.

 Azure Messaging-ServiceBus Messaging消息隊(duì)列技術(shù)系列5-重復(fù)消息:at-least-once at-most-once

本文中我們主要研究并介紹Azure Messaging的消息回執(zhí)機(jī)制:實(shí)際應(yīng)用場景:

同步收發(fā)場景下,消息生產(chǎn)者和消費(fèi)者雙向應(yīng)答模式,例如:張三寫封信送到郵局中轉(zhuǎn)站,然后李四從中轉(zhuǎn)站獲得信,然后在寫一份回執(zhí)信,放到中轉(zhuǎn)站,然后張三去取,當(dāng)然張三寫信的時(shí)候就得寫明回信地址。還

有,生成訂單編號(hào)場景,發(fā)送一個(gè)生成訂單編號(hào)的消息,消息消費(fèi)者接收生成訂單編號(hào)的消息,并通過消息回執(zhí)返回。

Azure Messaging的消息回執(zhí)機(jī)制主要通過:基于帶會(huì)話的Queue/Topic、SessionId、ReplyTo屬性來實(shí)現(xiàn)

在代碼實(shí)現(xiàn)中,我們需要:

1. 兩個(gè)工作線程,一個(gè)線程用于消息發(fā)送和接收回執(zhí)消息,一個(gè)線程用于消息接收和發(fā)送消息回執(zhí)。

2. 一個(gè)會(huì)話標(biāo)識(shí):ReceiptSession  

3. 兩個(gè)隊(duì)列Queue:RequestQueue:發(fā)送消息、接收消息,ResponseQueue:發(fā)送回執(zhí)消息,接收回執(zhí)消息。

直接Show Code:

首先,我們?cè)赟erviceBusMQManager增加一個(gè)線程安全的創(chuàng)建帶回話的QueueClient方法:

photoshop培訓(xùn),電腦培訓(xùn),電腦維修培訓(xùn),移動(dòng)軟件開發(fā)培訓(xùn),網(wǎng)站設(shè)計(jì)培訓(xùn),網(wǎng)站建設(shè)培訓(xùn)

private static object syncObj = new object();        /// <summary>
        /// 獲取要求會(huì)話帶Session的QueueClient        /// </summary>
        /// <param name="queueName">隊(duì)列名稱</param>
        /// <returns>QueueClient</returns>
        public QueueClient GetSessionQueueClient(string queueName)
        {            var namespaceClient = NamespaceManager.Create();            if (!namespaceClient.QueueExists(queueName))
            {                lock (syncObj)
                {                    if (!namespaceClient.QueueExists(queueName))
                    {                        var queue = new QueueDescription(queueName) { RequiresSession = true };
                        namespaceClient.CreateQueue(queue);
                    }
                }
            }            return QueueClient.Create(queueName, ReceiveMode.ReceiveAndDelete);
        }

photoshop培訓(xùn),電腦培訓(xùn),電腦維修培訓(xùn),移動(dòng)軟件開發(fā)培訓(xùn),網(wǎng)站設(shè)計(jì)培訓(xùn),網(wǎng)站建設(shè)培訓(xùn)

然后我們定義一些常量:

photoshop培訓(xùn),電腦培訓(xùn),電腦維修培訓(xùn),移動(dòng)軟件開發(fā)培訓(xùn),網(wǎng)站設(shè)計(jì)培訓(xùn),網(wǎng)站建設(shè)培訓(xùn)

        private static readonly string ReplyToSessionId = "ReceiptSession";        const double ResponseMessageTimeout = 20.0;        private static readonly string requestQueueName = "RequestQueue";        private static readonly string responseQueueName = "ResponseQueue";

photoshop培訓(xùn),電腦培訓(xùn),電腦維修培訓(xùn),移動(dòng)軟件開發(fā)培訓(xùn),網(wǎng)站設(shè)計(jì)培訓(xùn),網(wǎng)站建設(shè)培訓(xùn)

實(shí)現(xiàn)發(fā)送并接收回執(zhí)消息的方法:

photoshop培訓(xùn),電腦培訓(xùn),電腦維修培訓(xùn),移動(dòng)軟件開發(fā)培訓(xùn),網(wǎng)站設(shè)計(jì)培訓(xùn),網(wǎng)站建設(shè)培訓(xùn)

        /// <summary>
        /// 發(fā)送并接收回執(zhí)消息        /// </summary>
        /// <param name="bills"></param>
        public static void SendMessage()
        {            var manager = new ServiceBusUtils();            var responseClient = manager.GetSessionQueueClient(responseQueueName);            var requestClient = manager.GetSessionQueueClient(requestQueueName);            var messsageReceiver = responseClient.AcceptMessageSession(ReplyToSessionId);            var order = CreateSalesOrder(1);            //發(fā)送消息
            var message = new BrokeredMessage(order);
            message.Properties.Add("Type", order.GetType().ToString());
            message.SessionId = ReplyToSessionId;
            message.MessageId = "OrderMessage001";
            message.ReplyTo = responseQueueName;
            requestClient.Send(message);
            Console.WriteLine("Send message: " + message.MessageId + ", SalesOrder ID: " + order.OrderID);            //接收消息回執(zhí)
            var receivedMessage = messsageReceiver.Receive(TimeSpan.FromSeconds(ResponseMessageTimeout * 2));            var receivedOrder = receivedMessage.GetBody<SalesOrder>();
            Console.WriteLine("Receive receipt message: " + receivedMessage.MessageId + ", SalesOrder ID: " + receivedOrder.OrderID);
            messsageReceiver.Close();
        }

photoshop培訓(xùn),電腦培訓(xùn),電腦維修培訓(xùn),移動(dòng)軟件開發(fā)培訓(xùn),網(wǎng)站設(shè)計(jì)培訓(xùn),網(wǎng)站建設(shè)培訓(xùn)

實(shí)現(xiàn)接收消息并發(fā)送回執(zhí)方法:

photoshop培訓(xùn),電腦培訓(xùn),電腦維修培訓(xùn),移動(dòng)軟件開發(fā)培訓(xùn),網(wǎng)站設(shè)計(jì)培訓(xùn),網(wǎng)站建設(shè)培訓(xùn)

 1         /// <summary> 2         /// 接收消息并回執(zhí) 3         /// </summary> 4         public static void ReceiveMessage() 5         { 6             var manager = new ServiceBusUtils(); 7  8             var requestClient = manager.GetSessionQueueClient(requestQueueName); 9             var session = requestClient.AcceptMessageSession();10             var requestMessage = session.Receive();11            12             if (requestMessage != null)13             {14                 var receivedOrder = requestMessage.GetBody<SalesOrder>();15                 Console.WriteLine("Receive message: " + requestMessage.MessageId + ", SalesOrder ID: " + receivedOrder.OrderID);16 17                 var responseMessage = new BrokeredMessage(receivedOrder);18                 responseMessage.Properties.Add("Type", receivedOrder.GetType().ToString());19                 responseMessage.ReplyToSessionId = ReplyToSessionId;20                 responseMessage.MessageId = "ResponseOrderMessage001";21                 responseMessage.SessionId = requestMessage.SessionId;22                23                 //發(fā)送回執(zhí)消息24                 var responseClient = manager.GetSessionQueueClient(requestMessage.ReplyTo);25                 responseClient.Send(responseMessage);26                 Console.WriteLine("Send receipt message: " + responseMessage.MessageId + ", SalesOrder ID: " + receivedOrder.OrderID);               
27             }28         }

photoshop培訓(xùn),電腦培訓(xùn),電腦維修培訓(xùn),移動(dòng)軟件開發(fā)培訓(xùn),網(wǎng)站設(shè)計(jì)培訓(xùn),網(wǎng)站建設(shè)培訓(xùn)

Main方法中,啟動(dòng)兩個(gè)工作線程:一個(gè)線程用于消息發(fā)送和接收回執(zhí)消息,一個(gè)線程用于消息接收和發(fā)送消息回執(zhí)。

因?yàn)樯婕暗紸zure Messaging中隊(duì)列的第一次創(chuàng)建,Azure Messaging是不支持多個(gè)請(qǐng)求同時(shí)創(chuàng)建同一個(gè)隊(duì)列的,因此,我們兩個(gè)線程間做一個(gè)簡單的Task.Delay(3000).Wait();

photoshop培訓(xùn),電腦培訓(xùn),電腦維修培訓(xùn),移動(dòng)軟件開發(fā)培訓(xùn),網(wǎng)站設(shè)計(jì)培訓(xùn),網(wǎng)站建設(shè)培訓(xùn)

 1         static void Main(string[] args) 2         { 3             var sendTask = Task.Factory.StartNew(() => { SendMessage(); }); 4             Task.Delay(3000).Wait(); 5             var receiveTask = Task.Factory.StartNew(() => { ReceiveMessage(); }); 6  7             Task.WaitAll(sendTask, receiveTask); 8  9             Console.ReadKey();           
10         }

photoshop培訓(xùn),電腦培訓(xùn),電腦維修培訓(xùn),移動(dòng)軟件開發(fā)培訓(xùn),網(wǎng)站設(shè)計(jì)培訓(xùn),網(wǎng)站建設(shè)培訓(xùn)

我們看看程序輸出:

photoshop培訓(xùn),電腦培訓(xùn),電腦維修培訓(xùn),移動(dòng)軟件開發(fā)培訓(xùn),網(wǎng)站設(shè)計(jì)培訓(xùn),網(wǎng)站建設(shè)培訓(xùn)

Azure 服務(wù)總線中的隊(duì)列:

photoshop培訓(xùn),電腦培訓(xùn),電腦維修培訓(xùn),移動(dòng)軟件開發(fā)培訓(xùn),網(wǎng)站設(shè)計(jì)培訓(xùn),網(wǎng)站建設(shè)培訓(xùn)

可以看出:Azure Messaging-ServiceBus Messaging 基于帶會(huì)話的Queue/Topic、SessionId、ReplyTo屬性來實(shí)現(xiàn)消息回執(zhí)機(jī)制。

http://www.cnblogs.com/tianqing/p/6607682.html