過了個春節(jié),回到公司的成小胖變成了成大胖。但是你們千萬別以為他那個大肚子里面裝的都是肥肉,里面的墨水也多了不少嘞,畢竟成小胖利用春節(jié)的半個月時間專心學習并研究了 ActiveMQ,嘿嘿……
這不,為了檢驗下自己的學習成果,上班的第一天成小胖就去找架構師老王交流 ActiveMQ 相關的知識,還順便向老王討了個紅包,可把成小胖給高興壞了。
“來,根據(jù)你的了解說下 ActiveMQ 是什么?!?/p>
“這個簡單,ActiveMQ 是一個 MOM,具體來說是一個實現(xiàn)了 JMS 規(guī)范的系統(tǒng)間遠程通信的消息代理。它……”
“等等,先解釋下什么是 MOM?!?/p>
“好。MOM 就是面向消息中間件(Message-oriented middleware),是用于以分布式應用或系統(tǒng)中的異步、松耦合、可靠、可擴展和安全通信的一類軟件。MOM 的總體思想是它作為消息發(fā)送器和消息接收器之間的消息中介,這種中介提供了一個全新水平的松耦合?!?/span>
“JMS呢?”
成小胖是個追求極致的人,為了解釋得更通俗易懂,索性搬來一塊白板邊畫邊說。
“JMS 叫做 Java 消息服務(Java Message Service),是 Java 平臺上有關面向 MOM 的技術規(guī)范,旨在通過提供標準的產生、發(fā)送、接收和處理消息的 API 簡化企業(yè)應用的開發(fā),類似于 JDBC 和關系型數(shù)據(jù)庫通信方式的抽象。”
“嗯,很好。下面的這些概念你也需要特別理解下”:
Provider:純 Java 語言編寫的 JMS 接口實現(xiàn)(比如 ActiveMQ 就是)
Domains:消息傳遞方式,包括點對點(P2P)、發(fā)布/訂閱(Pub/Sub)兩種
Connection factory:客戶端使用連接工廠來創(chuàng)建與 JMS provider 的連接
Destination:消息被尋址、發(fā)送以及接收的對象
“你來說說這其中 P2P 和 Pub/Sub 的區(qū)別吧”,老王給成小胖拋出了一個問題。
成小胖可不是吃素的,畢竟要是吃素的話他也吃不到這么胖……這些基本概念對他來說都是小事一樁:
P2P (點對點)消息域使用 queue 作為 Destination,消息可以被同步或異步的發(fā)送和接收,每個消息只會給一個 Consumer 傳送一次。
Consumer 可以使用 MessageConsumer.receive() 同步地接收消息,也可以通過使用MessageConsumer.setMessageListener() 注冊一個 MessageListener 實現(xiàn)異步接收。
多個 Consumer 可以注冊到同一個 queue 上,但一個消息只能被一個 Consumer 所接收,然后由該 Consumer 來確認消息。并且在這種情況下,Provider 對所有注冊的 Consumer 以輪詢的方式發(fā)送消息。
Pub/Sub(發(fā)布/訂閱,Publish/Subscribe)消息域使用 topic 作為 Destination,發(fā)布者向 topic 發(fā)送消息,訂閱者注冊接收來自 topic 的消息。發(fā)送到 topic 的任何消息都將自動傳遞給所有訂閱者。接收方式(同步和異步)與 P2P 域相同。
除非顯式指定,否則 topic 不會為訂閱者保留消息。當然,這可以通過持久化(Durable)訂閱來實現(xiàn)消息的保存。這種情況下,當訂閱者與 Provider 斷開時,Provider 會為它存儲消息。當持久化訂閱者重新連接時,將會受到所有的斷連期間未消費的消息。
“嗯,總結的很不錯,上面的這些知識是學習 ActiveMQ 的理論基礎,是必須要掌握的?!?/p>
“既然 JMS 是一個通用的規(guī)范,那么使用它創(chuàng)建應用程序肯定也有一個通用的步驟吧?”老王追問道。
“有的有的。要不您來說說這個通用步驟?就當我考考您,哈哈!”成小胖故作聰明,自以為老王作為架構師不會關注這些太具體的實現(xiàn)細節(jié)。
然而老王平日里親力親為,至今還常常擼代碼,怎么會被這種小 case 所難倒?于是老王分分鐘給出答案:
獲取連接工廠
使用連接工廠創(chuàng)建連接
啟動連接
從連接創(chuàng)建會話
獲取 Destination
創(chuàng)建 Producer,或
創(chuàng)建 Producer
創(chuàng)建 message
創(chuàng)建 Consumer,或發(fā)送或接收message發(fā)送或接收 message
創(chuàng)建 Consumer
注冊消息監(jiān)聽器(可選)
發(fā)送或接收 message
關閉資源(connection, session, producer, consumer 等)
“66666,厲害啊我的王哥!”成小胖的小聰明被老王擊得粉碎!
“你嘴皮子耍夠了吧,還是多動動手吧。現(xiàn)在你手寫上面步驟對應的代碼實現(xiàn)吧”,老王給了成小胖一個眼神,讓他自己慢慢體會……
成小胖也不是省油的燈,馬上擦干凈白板,現(xiàn)場擼了起來(是擼代碼,擼代碼,擼代碼,重要的事情說三遍):
public class JMSDemo { ConnectionFactory connectionFactory; Connection connection; Session session; Destination destination; MessageProducer producer; MessageConsumer consumer; Message message; boolean useTransaction = false; try { Context ctx = new InitialContext(); connectionFactory = (ConnectionFactory) ctx.lookup("ConnectionFactoryName"); //使用ActiveMQ時:connectionFactory = new ActiveMQConnectionFactory(user, password, getOptimizeBrokerUrl(broker)); connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("TEST.QUEUE"); //生產者發(fā)送消息 producer = session.createProducer(destination); message = session.createTextMessage("this is a test"); //消費者同步接收 consumer = session.createConsumer(destination); message = (TextMessage) consumer.receive(1000); System.out.println("Received message: " + message); //消費者異步接收 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { if (message != null) { doMessageEvent(message); } } }); } catch (JMSException e) { ... } finally { producer.close(); session.close(); connection.close(); } }
老王滿意的點點頭:“還算不賴哈~ JMS 通用的規(guī)范咱們都聊完了,下面就來聊點 ActiveMQ 更具體點的東西咯?!?/p>
“好啊好啊。要不我先基于自己的學習講講 ActiveMQ 的存儲,您看看我哪里講的不對或者遺漏的,可好?”成小胖發(fā)揮了他一貫的積極主動的作風,當然內心里還是想得到老王的贊許。
“行,那就開始吧。”
ActiveMQ 在 queue 中存儲 Message 時,采用先進先出順序(FIFO)存儲。同一時間一個消息被分派給單個消費者,且只有當 Message 被消費并確認時,它才能從存儲中刪除。
對于持久化訂閱者來說,每個消費者獲得 Message 的副本。為了節(jié)省存儲空間,Provider 僅存儲消息的一個副本。持久化訂閱者維護了指向下一個 Message 的指針,并將其副本分派給消費者。以這種方式實現(xiàn)消息存儲,因為每個持久化訂閱者可能以不同的速率消費 Message,或者它們可能不是全部同時運行。此外,因每個 Message 可能存在多個消費者,所以在它被成功地傳遞給所有持久化訂閱者之前,不能從存儲中刪除。
“很好,上面這段知識非常重要。其實我們可以通過表格來更清晰地展示”,老王補充道,并在白板上畫了以下表格:
消息類型 | 是否持久化 | 是否有Durable訂閱者 | 消費者延遲啟動時,消息是否保留 | Broker重啟時,消息是否保留 |
Queue | N | - | Y | N |
Queue | Y | - | Y | Y |
Topic | N | N | N | N |
Topic | N | Y | Y | N |
Topic | Y | N | N | N |
Topic | Y | Y | Y | Y |
成小胖雖然對以上特性做過實踐對比,但是并沒有想到去畫一個表格出來使對比更加清晰易懂。特別是當他看到老王隨時就畫出這個表格時便驚嘆不已,大聲喊道:“老王你太牛了,真是愛死你了!”
周圍的同事聽到后,都齊刷刷的往這邊看過來。
此情此景,老王也不好意思了:“誒誒誒,說話注意哈,不要讓人覺得我們在搞基。回歸正題,你再說說 ActiveMQ 常用的存儲方式吧。”
成小胖羞澀的點點頭,迅速地回歸原態(tài),一五一十地說起來。
1.KahaDB
ActiveMQ 5.3 版本起的默認存儲方式。KahaDB存儲是一個基于文件的快速存儲消息,設計目標是易于使用且盡可能快。它使用基于文件的消息數(shù)據(jù)庫意味著沒有第三方數(shù)據(jù)庫的先決條件。
要啟用 KahaDB 存儲,需要在 activemq.xml 中進行以下配置:
<broker brokerName="broker" persistent="true" useShutdownHook="false"> <persistenceAdapter> <kahaDB directory="${activemq.data}/kahadb" journalMaxFileLength="16mb"/> </persistenceAdapter> </broker>
2.AMQ
與 KahaDB 存儲一樣,AMQ存儲使用戶能夠快速啟動和運行,因為它不依賴于第三方數(shù)據(jù)庫。AMQ 消息存儲庫是可靠持久性和高性能索引的事務日志組合,當消息吞吐量是應用程序的主要需求時,該存儲是最佳選擇。但因為它為每個索引使用兩個分開的文件,并且每個 Destination 都有一個索引,所以當你打算在代理中使用數(shù)千個隊列的時候,不應該使用它。
<persistenceAdapter> <amqPersistenceAdapter directory="${activemq.data}/kahadb" syncOnWrite="true" indexPageSize="16kb" indexMaxBinSize="100" maxFileLength="10mb" /> </persistenceAdapter>
3.JDBC
選擇關系型數(shù)據(jù)庫,通常的原因是企業(yè)已經(jīng)具備了管理關系型數(shù)據(jù)的專長,但是它在性能上絕對不優(yōu)于上述消息存儲實現(xiàn)。事實是,許多企業(yè)使用關系數(shù)據(jù)庫作為存儲,是因為他們更愿意充分利用這些數(shù)據(jù)庫資源。
<beans> <broker brokerName="test-broker" persistent="true" xmlns="http://activemq.apache.org/schema/core"> <persistenceAdapter> <jdbcPersistenceAdapter dataSource="#mysql-ds"/> </persistenceAdapter> </broker> <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/> <property name="username" value="activemq"/> <property name="password" value="activemq"/> <property name="maxActive" value="200"/> <property name="poolPreparedStatements" value="true"/> </bean> </beans>
4.內存存儲
內存消息存儲器將所有持久消息保存在內存中。在僅存儲有限數(shù)量 Message 的情況下,內存消息存儲會很有用,因為 Message 通常會被快速消耗。在 activema.xml 中將 broker 元素上的 persistent 屬性設置為 false 即可。
<broker brokerName="test-broker" persistent="false" xmlns="http://activemq.apache.org/schema/core"> <transportConnectors> <transportConnector uri="tcp://localhost:61635"/> </transportConnectors> </broker>
老王聽完后露出贊許的笑容:“連配置都能寫的這么詳細,看來你確實是做了不少功課,給你點個贊?!崩贤踅K究不會吝嗇自己的贊美,他也明白這些贊美對成小胖意味著什么。
成小胖得到了老王的贊賞,心里也是吃了蜜一般。
沒等成小胖說話,老王拿起筆走到白板前,說:“下面就根據(jù)我在工作中的經(jīng)歷,給你講講 ActiveMQ 的部署模式。”
1.單例模式
這個就不啰嗦了,略過。
2.無共享主從模式
這是最簡單的 Provider 高可用性的方案,主從節(jié)點分別存儲 Message。從節(jié)點需要配置為連接到主節(jié)點,并且需要特殊配置其狀態(tài)。
所有消息命令(消息,確認,訂閱,事務等)都從主節(jié)點復制到從節(jié)點,這種復制發(fā)生在主節(jié)點對其接收的任何命令生效之前。并且,當主節(jié)點收到持久消息,會等待從節(jié)點完成消息的處理(通常是持久化到存儲),然后再自己完成消息的處理(如持久化到存儲)后,再返回對 Producer 的回執(zhí)。
從節(jié)點不啟動任何傳輸,也不能接受任何客戶端或網(wǎng)絡連接,除非主節(jié)點失效。當主節(jié)點失效后,從節(jié)點自動成為主節(jié)點,并且開啟傳輸并接受連接。這是,使用 failover 傳輸?shù)目蛻舳司蜁B接到該新主節(jié)點。
Broker 連接配置如下:
failover://(tcp://masterhost:61616,tcp://slavehost:61616)?randomize=false
但是,這種部署模式有一些限制,
主節(jié)點只會在從節(jié)點連接到主節(jié)點時復制其活動狀態(tài),因此當從節(jié)點沒有連接上主節(jié)點之前,任何主節(jié)點處理的 Message 或者消息確認都會在主節(jié)點失效后丟失。不過你可以通過在主節(jié)點設置 waitForSlave 來避免,這樣就強制主節(jié)點在沒有任何一個從節(jié)點連接上的情況下接受連接。
就是主節(jié)點只能有一個從節(jié)點,并且從節(jié)點不允許再有其他從節(jié)點。
把正在運行的單例配置成無共享主從,或者配置新的從節(jié)點時,你都要停止當前服務,修改配置后再重啟才能生效。
在可以接受一些故障停機時間的情況下,可以使用該模式。
從節(jié)點配置:
<services> <masterConnector remoteURI="tcp://remotehost:62001" userName="Rob" password="Davies"/> </services>
此外,可以配置 shutdownOnMasterFailure 項,表示主節(jié)點失效后安全關閉,保證沒有消息丟失,允許管理員維護一個新的從節(jié)點。
3.共享存儲主從模式
允許多個代理共享存儲,但任意時刻只有一個是活動的。這種情況下,當主節(jié)點失效時,無需人工干預來維護應用的完整性。另外一個好處就是沒有從節(jié)點數(shù)的限制。
有兩種細分模式:
(1)基于數(shù)據(jù)庫
它會獲取一個表上的排它鎖,以確保沒有其他 ActiveMQ 代理可以同時訪問數(shù)據(jù)庫。其他未獲得鎖的代理則處于輪詢狀態(tài),就會被當做是從節(jié)點,不會開啟傳輸也不會接受連接。
(2)基于文件系統(tǒng)
需要獲取分布式共享文件鎖,linux 系統(tǒng)下推薦用 GFS2。
看到這些干貨,成小胖欣喜若狂一邊聽一邊記,等老王講完后他還沒記完。而老王則趁機喝了杯鐵觀音潤潤嗓子。
在記錄完老王所講的部署模式后,成小胖也不好意思再讓老王繼續(xù)講下去了,畢竟他知道老王常年加班腰間盤突出,不能長時間站著。
“王哥您坐著休息下,我再給您講講我所理解的 ActiveMQ 的網(wǎng)絡連接,中不中?”
“中。沒事兒,我身體好著呢~”老王知道成小胖擔心他的腰,但他還是那個倔脾氣。成小胖也不敢多耽誤時間,立馬開講。
1.代理網(wǎng)絡
支持將 ActiveMQ 消息代理鏈接到不同拓撲,這就是被人們熟知的代理網(wǎng)絡。
ActiveMQ 網(wǎng)絡使用存儲和轉發(fā)的概念,其中消息總是存儲在本地代理中,然后通過網(wǎng)絡轉發(fā)到另一個代理。
當連接建立后,遠程代理將把包含其所有持久和活動消費者目的地的信息傳遞給本地代理,本地代理根據(jù)信息決定遠程代理感興趣的 Message 并將它發(fā)送給遠程代理。
如果希望網(wǎng)絡是雙向的,您可以使用網(wǎng)絡連接器將遠程代理配置為指向本地代理,或將網(wǎng)絡連接器配置為雙工,以便雙向發(fā)送消息。
<networkConnectors> <networkConnector uri="static://(tcp://backoffice:61617)" name="bridge" duplex="true" conduitSubscriptions="true" decreaseNetworkConsumerPriority="false"> </networkConnector> </networkConnectors>
注意,配置的順序很重要:
1.網(wǎng)絡連接——需要在消息存儲前建立好連接,對應 networkConnectors 元素
2.消息存儲——需要在傳輸前配置好,對應 persistenceAdapter 元素
3.消息傳輸——最后配置,對應 transportConnectors 元素
2.網(wǎng)絡發(fā)現(xiàn)
(1)動態(tài)發(fā)現(xiàn)
使用多播來支持網(wǎng)絡動態(tài)發(fā)現(xiàn)。配置如下:
<networkConnectors> <networkConnector uri="multicast://default"/> </networkConnectors>
其中,multicast:// 中的默認名稱表示該代理所屬的組。因此使用此方式時,強烈推薦你使用一個獨特的組名,避免你的代理連接到其他不相關代理。
(2)靜態(tài)發(fā)現(xiàn)
靜態(tài)發(fā)現(xiàn)接受代理 URI 列表,并將嘗試按列表中確定的順序連接到遠程代理。
作者:cyfonly
出處:http://www.cnblogs.com/cyfonly/
本文版權歸作者和博客園共有,歡迎轉載,未經(jīng)同意須保留此段聲明,且在文章頁面明顯位置給出原文連接。歡迎指正與交流。
http://www.cnblogs.com/cyfonly/p/6380860.html