上篇博客中,我們用實(shí)際的業(yè)務(wù)場(chǎng)景和代碼示例了Azure Messaging-ServiceBus Messaging對(duì)復(fù)雜對(duì)象消息的支持和消息的持久化:

Azure Messaging-ServiceBus Messaging消息隊(duì)列技術(shù)系列4-復(fù)雜對(duì)象消息是否需要支持序列化和消息持久化

本文中我們主要研究并介紹Azure Messaging對(duì)重復(fù)消息的支持。

MessageReceiver 對(duì)象創(chuàng)建時(shí)可以指定消息接收模式: ReceiveAndDelete 和 PeekLock (默認(rèn)),其中:

1. 使用 ReceiveAndDelete 模式時(shí),接收是單步操作,即當(dāng) Service Bus 收到請(qǐng)求時(shí),它將消息標(biāo)記為“正在使用”,然后將其返回給應(yīng)用程序。ReceiveAndDelete 模式是最簡(jiǎn)

單的模型,并且最適合在出現(xiàn)故障時(shí)應(yīng)用程序能夠容許不處理消息的場(chǎng)景。理解此模式時(shí),可考慮這種情況:使用者發(fā)出了接收請(qǐng)求,但在處理消息之前發(fā)生崩潰。由于 Service B

us已將消息標(biāo)記為“正在使用”,因此當(dāng)應(yīng)用程序重新啟動(dòng)并重新開始使用消息時(shí),它就會(huì)錯(cuò)過(guò)在崩潰前已使用的消息。

2. 在 PeekLock 模式下,接收變成兩階段操作,因此可以支持不能容許錯(cuò)過(guò)消息的應(yīng)用程序。當(dāng) Service Bus 收到請(qǐng)求時(shí),它會(huì)找到下一條要使用的消息,將其鎖定以防止其他使

用者接收它,然后將其返回給應(yīng)用程序。應(yīng)用程序完成消息處理(或?qū)⑾⒖煽康卮鎯?chǔ)以便將來(lái)處理)后,會(huì)對(duì)收到的消息調(diào)用 Complete 以完成接收過(guò)程的第二階段。當(dāng)Service

Bus 看到 Complete 時(shí),會(huì)將該消息標(biāo)記為“正在使用”。另外兩個(gè)結(jié)果也是可能的。第一個(gè)結(jié)果,如果由于某種原因應(yīng)用程序無(wú)法處理該消息,它可以對(duì)收到的消息Abandon(而

不是 Complete)。這將導(dǎo)致 Service Bus 解鎖該消息,并使該消息可以重新被同一使用者或其他競(jìng)爭(zhēng)的使用者接收。第二個(gè)結(jié)果,即存在與鎖定關(guān)聯(lián)的超時(shí),如果應(yīng)用程序在鎖

定超時(shí)到期前無(wú)法處理改消息(例如,應(yīng)用程序崩潰)則 Service Bus 將解鎖該消息并使其可以重新被接收。如果應(yīng)用程序在處理該消息后崩潰,但此時(shí)尚未發(fā)出 Complete 請(qǐng)

求,則在應(yīng)用程序重新啟動(dòng)時(shí),該消息將重新傳遞給應(yīng)用程序。這通常稱為“至少一次”處理。這意味著每條消息都將至少處理一次,但在某些情況下可能會(huì)重新傳遞同一消息。如果

方案不能容許重復(fù)處理,則需要在應(yīng)用程序中添加檢測(cè)重復(fù)項(xiàng)的邏輯。這可以基于消息的 MessageId 屬性來(lái)實(shí)現(xiàn)。此屬性的值在傳遞嘗試過(guò)程中保持不變。這稱為“恰好一次”處、

理。

接下來(lái),我們通過(guò)Code show一下消息的重復(fù)發(fā)送和重復(fù)接收。

消息重復(fù)發(fā)送:同一個(gè)消息BrokeredMessage發(fā)送兩次

photoshop培訓(xùn),電腦培訓(xùn),電腦維修培訓(xùn),移動(dòng)軟件開發(fā)培訓(xùn),網(wǎng)站設(shè)計(jì)培訓(xùn),網(wǎng)站建設(shè)培訓(xùn)

/// <summary>
        /// 發(fā)送消息        /// </summary>
        private static void MessageMultiSendTest()
        {            var sbUtils = new ServiceBusUtils();            //創(chuàng)建隊(duì)列
            sbUtils.CreateQueue(queueName, false);            //多次發(fā)送消息到OrderQueue
            var queueSendClient = sbUtils.GetQueueClient(queueName);            var order = CreateSalesOrder(1);            var message = sbUtils.Create(order);            queueSendClient.Send(message);
            queueSendClient.Send(message);

            Console.WriteLine("Send Completed!");
        }

photoshop培訓(xùn),電腦培訓(xùn),電腦維修培訓(xùn),移動(dòng)軟件開發(fā)培訓(xùn),網(wǎng)站設(shè)計(jì)培訓(xùn),網(wǎng)站建設(shè)培訓(xùn)

實(shí)際執(zhí)行過(guò)程中是出錯(cuò)的:

photoshop培訓(xùn),電腦培訓(xùn),電腦維修培訓(xùn),移動(dòng)軟件開發(fā)培訓(xùn),網(wǎng)站設(shè)計(jì)培訓(xùn),網(wǎng)站建設(shè)培訓(xùn)

由此可以得出:

 

Azure Messaging 不支持同一個(gè)消息發(fā)送多次,必須通過(guò)new多個(gè)BrokeredMessage實(shí)例實(shí)現(xiàn)。

同時(shí),消息的唯一性由消息的MessageID來(lái)標(biāo)識(shí)!

PeekAndLock模式下消息的重復(fù)接收:

接收模式PeekAndLock,同一個(gè)隊(duì)列,第一個(gè)Consumer接收消息,但是不Complete;然后第二個(gè)Consumer繼續(xù)接收消息,此時(shí)第一個(gè)Consumer未Complete的消息有一個(gè)

TTL,在TTL時(shí)間區(qū)間之內(nèi),第二個(gè)Consumer可以繼續(xù)接收當(dāng)前隊(duì)列未鎖定的消息,當(dāng)TTL時(shí)間到達(dá)后,釋放第一個(gè)Consumer鎖定的消息,第二個(gè)Consumer讀取到了第一個(gè)

Consumer未Complete的消息。

The duration of a peek lock; that is, the amount of time that the message is locked for other receivers. The maximum value for LockDuration is 5 minutes;

the default value is 1 minute.

photoshop培訓(xùn),電腦培訓(xùn),電腦維修培訓(xùn),移動(dòng)軟件開發(fā)培訓(xùn),網(wǎng)站設(shè)計(jì)培訓(xùn),網(wǎng)站建設(shè)培訓(xùn)

 /// <summary>
        /// 接收消息        /// </summary>
        private static void MessageReceive()
        {            int index = 0;            
            var sbUtils = new ServiceBusUtils();            var queueReveiveClient1 = sbUtils.GetReceiveQueueClient(queueName, ReceiveMode.PeekLock);            for (int i = 0; i < 10; i++)
            {                var msg = queueReveiveClient1.Peek();
                Console.WriteLine(string.Format("Received {0} MessageID: {1}", i, msg.MessageId));
            }            var queueReveiveClient2 = sbUtils.GetReceiveQueueClient(queueName, ReceiveMode.PeekLock);            for (int i = 0; i < 10; i++)
            {                var msg = queueReveiveClient2.Receive();
                Console.WriteLine(string.Format("Second received {0} MessageID: {1}", i, msg.MessageId));
                msg.Complete();
            }            ////刪除隊(duì)列
            //sbUtils.DeleteQueue(queueName);
            Console.WriteLine("Receive Completed!");
        }

photoshop培訓(xùn),電腦培訓(xùn),電腦維修培訓(xùn),移動(dòng)軟件開發(fā)培訓(xùn),網(wǎng)站設(shè)計(jì)培訓(xùn),網(wǎng)站建設(shè)培訓(xùn)

第一個(gè)隊(duì)列Consumer使用Peek模式接收到消息,只是取出消息,不從消息隊(duì)列中移出。

第二個(gè)隊(duì)列Consumer使用Receive模式同樣可以接收消息。

photoshop培訓(xùn),電腦培訓(xùn),電腦維修培訓(xùn),移動(dòng)軟件開發(fā)培訓(xùn),網(wǎng)站設(shè)計(jì)培訓(xùn),網(wǎng)站建設(shè)培訓(xùn)

如果兩個(gè)隊(duì)列Consumer都使用Receive模式接收消息,只有第一個(gè)Consumer可以接收到,第二個(gè)Consumer則接收不到,一直在等待消息的入隊(duì)!

photoshop培訓(xùn),電腦培訓(xùn),電腦維修培訓(xùn),移動(dòng)軟件開發(fā)培訓(xùn),網(wǎng)站設(shè)計(jì)培訓(xùn),網(wǎng)站建設(shè)培訓(xùn)

 /// <summary>
        /// 接收消息        /// </summary>
        private static void MessageReceive()
        {            int index = 0;            
            var sbUtils = new ServiceBusUtils();            var queueReveiveClient1 = sbUtils.GetReceiveQueueClient(queueName, ReceiveMode.PeekLock);            for (int i = 0; i < 10; i++)
            {                var msg = queueReveiveClient1.Receive();
                Console.WriteLine(string.Format("Received {0} MessageID: {1}", i, msg.MessageId));
            }            var queueReveiveClient2 = sbUtils.GetReceiveQueueClient(queueName, ReceiveMode.PeekLock);            for (int i = 0; i < 10; i++)
            {                var msg = queueReveiveClient2.Receive();
                Console.WriteLine(string.Format("Second received {0} MessageID: {1}", i, msg.MessageId));
                msg.Complete();
            }            ////刪除隊(duì)列
            //sbUtils.DeleteQueue(queueName);
            Console.WriteLine("Receive Completed!");
        }

photoshop培訓(xùn),電腦培訓(xùn),電腦維修培訓(xùn),移動(dòng)軟件開發(fā)培訓(xùn),網(wǎng)站設(shè)計(jì)培訓(xùn),網(wǎng)站建設(shè)培訓(xùn)

photoshop培訓(xùn),電腦培訓(xùn),電腦維修培訓(xùn),移動(dòng)軟件開發(fā)培訓(xùn),網(wǎng)站設(shè)計(jì)培訓(xùn),網(wǎng)站建設(shè)培訓(xùn)

因?yàn)橄⒁呀?jīng)被第一個(gè)Consumer消費(fèi)。

通過(guò)本篇我們了解了Azure Messaging-ServiceBus Messaging對(duì)重復(fù)消息的處理機(jī)制。

http://www.cnblogs.com/tianqing/p/6561635.html