前言

介紹

[NetMQ](https://github.com/zeromq/netmq.git)是ZeroMQ的C#移植版本,它是對標(biāo)準(zhǔn)socket接口的擴展。它提供了一種異步消息隊列,多消息模式,消息過濾(訂閱),對多種傳輸協(xié)議的無縫訪問。
當(dāng)前有2個版本正在維護,版本3最新版為3.3.4,版本4最新版本為4.0.1。本文檔是對4.0.1分支代碼進行分析。

zeromq的英文文檔
NetMQ的英文文檔

目的

對NetMQ的源碼進行學(xué)習(xí)并分析理解,因此寫下該系列文章,本系列文章暫定編寫計劃如下:

  1. 消息隊列NetMQ 原理分析1-Context和ZObject

  2. 消息隊列NetMQ 原理分析2-IO線程和完成端口

  3. 消息隊列NetMQ 原理分析3-命令產(chǎn)生/處理、創(chuàng)建Socket和回收線程

  4. 消息隊列NetMQ 原理分析4-Socket、Session、Option和Pipe

  5. 消息隊列NetMQ 原理分析5-Engine,Encord和Decord

  6. 消息隊列NetMQ 原理分析6-TCP和Inpoc實現(xiàn)

  7. 消息隊列NetMQ 原理分析7-Device

  8. 消息隊列NetMQ 原理分析8-不同類型的Socket

  9. 消息隊列NetMQ 原理分析9-實戰(zhàn)

友情提示: 看本系列文章時最好獲取源碼,更有助于理解。


Socket

上一章最后我們簡單介紹了SocketBaseSessionBase的創(chuàng)建和回收,這一張我們詳細介紹SocketBaseSessionBase。
首先SocketBase繼承自Own,即也是ZObject對象,同時由于SocketBase需要進行消息的傳輸,因此它實現(xiàn)了一些結(jié)構(gòu),包括IPollEvents、Pipe.IPipeEvents。

接口實現(xiàn)

internal abstract class SocketBase : Own, IPollEvents, Pipe.IPipeEvents{
    ...
}
  • IPollEvents事件上一章回收線程已經(jīng)介紹過,這里不再做過多說明了,簡單講SocketBase實現(xiàn)該事件只有在回收線程回收Socket的時候會觸發(fā)。

  • Pipe.IPipeEvents:是管道事件,它的簽名如下

    public interface IPipeEvents{void ReadActivated([NotNull] Pipe pipe);void WriteActivated([NotNull] Pipe pipe);void Hiccuped([NotNull] Pipe pipe);void Terminated([NotNull] Pipe pipe);
    }
  • ReadActivated:表示管道可讀,管道實際調(diào)用SocketBaseSessionBaseReadActivated方法,而SocketBase實際會調(diào)用XReadActivated方法。

  • WriteActivated:表示管道可寫,管道實際調(diào)用SocketBaseSessionBaseWriteActivated方法,而SocketBase實際會調(diào)用XWriteActivated方法。

  • Hiccuped:當(dāng)連接突然中斷時會調(diào)用此方法。

  • WriteActivated:表示管道終止。

內(nèi)部結(jié)構(gòu)

SocketBase的內(nèi)部維護著一個字段,用于存放連接/綁定地址和它的管道(若當(dāng)前SocketBaseTCPListener,則無需初始化管道,管道為空)。

private readonly Dictionary m_endpoints = new Dictionary();private readonly Dictionary m_inprocs = new Dictionary();

Endpoint對象用于存放SessionBasePipeListener的引用

private class Endpoint{    public Endpoint(Own own, Pipe pipe)    {
        Own = own;
        Pipe = pipe;
    }    public Own Own { get; }    public Pipe Pipe { get; }
}

當(dāng)SocketBase連接或綁定最后會向?qū)?code style="margin: 1px 5px; padding: 0px 5px !important; line-height: 1.8; vertical-align: middle; display: inline-block; font-family: ">Endpoint保存到字典中

private void AddEndpoint([NotNull] string address, [NotNull] Own endpoint, Pipe pipe){
    LaunchChild(endpoint);
    m_endpoints[address] = new Endpoint(endpoint, pipe);
}

SocketBase斷開連接時會移除它

public void TermEndpoint([NotNull] string addr){
    ...    if (protocol == Address.InProcProtocol)
    {
        ...
        m_inprocs.Remove(addr);
    }    else
    {
        ...
        m_endpoints.Remove(addr);
    }
}

m_inprocs也是一個字典用于存放inproc協(xié)議的連接。
第一章創(chuàng)建SocketBase我們介紹了Context創(chuàng)建SocketBase所做的一些工作,初始化SocketBase時,會創(chuàng)建MailBox,用于傳輸Command。

protected SocketBase([NotNull] Ctx parent, int threadId, int socketId)
    : base(parent, threadId){
    m_options.SocketId = socketId;
    m_mailbox = new Mailbox("socket-" + socketId);
}

每個SocketBase的命令處理實際都是在工作線程中進行。因此理論上(忽略線程上下文切換時造成的性能損失)線程數(shù)越多,NetMQ的IO吞吐量和工作線程數(shù)成正比關(guān)系。
Context創(chuàng)建SocketBase會根據(jù)Create靜態(tài)方法根據(jù)不同類型創(chuàng)建不同的SocketBase

public static SocketBase Create(ZmqSocketType type, [NotNull] Ctx parent, int threadId, int socketId)
{    switch (type)
    {        case ZmqSocketType.Pair:
            return new Pair(parent, threadId, socketId);        case ZmqSocketType.Pub:
            return new Pub(parent, threadId, socketId);        case ZmqSocketType.Sub:
            return new Sub(parent, threadId, socketId);        case ZmqSocketType.Req:
            return new Req(parent, threadId, socketId);        case ZmqSocketType.Rep:
            return new Rep(parent, threadId, socketId);        case ZmqSocketType.Dealer:
            return new Dealer(parent, threadId, socketId);        case ZmqSocketType.Router:
            return new Router(parent, threadId, socketId);        case ZmqSocketType.Pull:
            return new Pull(parent, threadId, socketId);        case ZmqSocketType.Push:
            return new Push(parent, threadId, socketId);        case ZmqSocketType.Xpub:
            return new XPub(parent, threadId, socketId);        case ZmqSocketType.Xsub:
            return new XSub(parent, threadId, socketId);        case ZmqSocketType.Stream:
            return new Stream(parent, threadId, socketId);        default:
            throw new InvalidException("SocketBase.Create called with invalid type of " + type);
    }
}

具體創(chuàng)建SocketBase的工作在上一章已經(jīng)做了詳細的介紹,這里不再復(fù)述。

Session

首先和SocketBase一樣,SessionBase也繼承自Own,即也是ZObject對象,同時由于SessionBaseSocketBase存在消息傳輸,所以它也實現(xiàn)了IPipeEvents接口,同時它實現(xiàn)了IProactorEvents接口,在消息收發(fā)是會接收到通知。SessionBase一端和SocketBase進行消息的通訊,另一端和Engine存在消息通訊,它實現(xiàn)了IMsgSinkIMsgSource接口和Engine進行消息傳輸。

 internal class SessionBase : Own,        Pipe.IPipeEvents, IProactorEvents,        IMsgSink, IMsgSource{

        }
internal interface IMsgSink{    ///     /// 傳輸消息.成功時返回true.
    ///     /// 將msg消息寫入到管道中    bool PushMsg(ref Msg msg);
}
internal interface IMsgSource{    ///     /// 取一個消息。成功時返回,從管道獲取消息寫入msg參數(shù)中;若失敗則返回false,將null寫入到msg參數(shù)中。
    ///     /// 從管道獲取消息寫入Msg中    /// true if successful - and writes the message to the msg argument    bool PullMsg(ref Msg msg);
}

青軟培訓(xùn),Java培訓(xùn),軟件培訓(xùn),Java培訓(xùn)機構(gòu),Java培訓(xùn)學(xué)校,萬碼學(xué)堂,電腦培訓(xùn),計算機培訓(xùn),Java培訓(xùn),JavaEE開發(fā)培訓(xùn),青島軟件培訓(xùn),軟件工程師培訓(xùn)

當(dāng)SocketBase將消息寫入到寫管道時,對應(yīng)的SessionBase會從讀管道讀到SocketBase寫入的數(shù)據(jù),然后將數(shù)據(jù)從管道取出生成一個Msg,Engine會和AsyncSocket交互傳輸數(shù)據(jù),關(guān)于Engine下一章再做介紹。

Option

option參數(shù)如下

  1. Affinity
    表示哪個線程是可用的,默認(rèn)為0,表示所有線程在負(fù)載均衡都可使用。

  2. Backlog
    最大Socket待連接數(shù)

  3. DelayAttachOnConnect
    在創(chuàng)建連接時,延遲在SocketSession之間創(chuàng)建雙向的管道,默認(rèn)創(chuàng)建連接時立即創(chuàng)建管道

  4. DelayOnClose
    若為true,則在Socket關(guān)閉時Session先從管道接收所有消息發(fā)送出去。
    否則直接關(guān)閉,默認(rèn)為true。

  5. DelayOnDisconnect
    若為true,則在Pipe通知我們中斷時Socket先將接收所有入隊管道消息。
    否則直接中斷管道。默認(rèn)為true.

  6. Endianness
    字節(jié)序,數(shù)據(jù)在內(nèi)存中是高到低排還是低到高排。

  7. Identity
    響應(yīng)的Identity,每個Identity用于查找Socket。Identiy是一個重復(fù)的隨機32位整形數(shù)字,轉(zhuǎn)換為字節(jié)5位字節(jié)數(shù)組。每個消息的第一部分是Identity,

  8. IdentitySize
    1個字節(jié)用于保存Identity的長度。

  9. IPv4Only

  10. Linger
    當(dāng)Socket關(guān)閉時,是否延遲一段時間等待數(shù)據(jù)發(fā)送完畢后再關(guān)閉管道

  11. MaxMessageSize
    每個消息包最大消息大小

  12. RawSocket
    若設(shè)置為true,RouterSocket可以接收非NetMQ發(fā)送來的tcp連接。
    默認(rèn)是false,Stream在構(gòu)造函數(shù)時會設(shè)置為true,設(shè)置為true時會將RecvIdentity修改為false(用NetMQ接收其他系統(tǒng)發(fā)送來的Socket請求應(yīng)該用StreamSocekt,否則由于應(yīng)用層協(xié)議不一樣可能會導(dǎo)致一些問題。)

  13. RecvIdentity
    若為true,Identity轉(zhuǎn)發(fā)給Socket。

  14. ReconnectIvl
    設(shè)置最小重連時間間隔,單位ms。默認(rèn)100ms

  15. ReconnectIvlMax
    設(shè)置最大重連時間間隔,單位ms。默認(rèn)0(無用)

  16. RecoveryIvl
    PgmSocket用的

  17. SendBuffer
    發(fā)送緩存大小,設(shè)置底層傳輸Socket的發(fā)送緩存大小,初始為0

  18. ReceiveBuffer
    接收緩存大小,設(shè)置底層傳輸Socket的接收緩存大小,初始為0

  19. SendHighWatermark
    Socket發(fā)送的管道的最大消息數(shù),當(dāng)發(fā)送水位達到最大時會阻塞發(fā)送。

  20. ReceiveHighWatermark
    Socket接收管道的最大消息數(shù)

  21. SendLowWatermark
    Socket發(fā)送低水位,消息的最小數(shù)量單位,每次達到多少消息數(shù)量才向Session管道才激活寫事件。默認(rèn)1000

  22. ReceiveLowWatermark
    Socket接收低水位,消息的最小數(shù)量單位,每次達到多少消息數(shù)量Session管道才激活讀事件。默認(rèn)1000

  23. SendTimeout
    Socket發(fā)送操作超時時間

  24. TcpKeepalive
    TCP保持連接設(shè)置,默認(rèn)-1不修改配置

  25. TcpKeepaliveIdle
    TCP心跳包在空閑時的時間間隔,默認(rèn)-1不修改配置

  26. TcpKeepaliveIntvl
    TCP心跳包時間間隔,默認(rèn)-1不修改配置

  27. DisableTimeWait
    客戶端斷開連接時禁用TIME_WAIT TCP狀態(tài)

Pipe

上一章我們講到過在SocketBaseSessionBase是通過2條單向管道進行消息傳輸,傳輸?shù)南挝皇?code style="margin: 1px 5px; padding: 0px 5px !important; line-height: 1.8; vertical-align: middle; display: inline-block; font-family: ">Msg,消息管道是YPipe類型,那么YPipe<>又是什么呢?

YPipe

Ypipe內(nèi)部實際維護這一個YQueue類型的先進先出隊列,YPipe向外暴露了一下方法:

  1. TryRead
    該方法用于判斷當(dāng)前隊列是否可讀,可讀的話第一個對象出隊

    public bool TryRead(out T value){if (!CheckRead())
    {    value = default(T);    return false;
    }value = m_queue.Pop();return true;
    }
  2. Unwrite
    取消寫入消息

    public bool Unwrite(ref T value){if (m_flushToIndex == m_queue.BackPos)    return false;value = m_queue.Unpush();return true;
    }
  3. 寫入消息
    將消息寫入到隊列中,若寫入未完成則當(dāng)前消息的指針?biāo)饕赶虍?dāng)前隊列塊的后一位。

    public void Write(ref T value, bool incomplete){
    m_queue.Push(ref value);// Move the "flush up to here" pointer.if (!incomplete)
    {
        m_flushToIndex = m_queue.BackPos;
    }
    }
  4. 完成寫入
    當(dāng)該部分消息寫完時,則會調(diào)用Flush完成寫入并通知另一個管道消息可讀

    public void Flush(){if (m_state == State.Terminating)    return;if (m_outboundPipe != null && !m_outboundPipe.Flush())
        SendActivateRead(m_peer);
    }

Msg

寫入的消息單位是Msg,它實現(xiàn)了多條數(shù)據(jù)的存儲,當(dāng)每次數(shù)據(jù)寫完還有數(shù)據(jù)帶寫入時通過將Flag標(biāo)記為More表示消息還沒寫入完。

YQueue

YQueue是由一個個trunk組成的,每個trunk就是一個消息塊,每個消息塊可能包含多個Msg,主要由寫入消息時是否還有更多消息帶寫入(Flag)決定。trunk是一個雙向循環(huán)鏈表,內(nèi)部維護著一個數(shù)組用于存放數(shù)據(jù),每個數(shù)據(jù)會有2個指針,分別指向前一個塊和后一個塊,每個塊還有一個索引,表示當(dāng)前塊在隊列中的位置。

private sealed class Chunk{    public Chunk(int size, int globalIndex)    {
        Values = new T[size];
        GlobalOffset = globalIndex;
        Debug.Assert(Values != null);
    }    
    /// 數(shù)據(jù)    public T[] Values { get; }    
    /// 當(dāng)前塊在隊列中的位置    public int GlobalOffset { get; }    /// 前一個塊    [CanBeNull]    public Chunk Previous { get; set; }    /// 下一個塊    [CanBeNull]    public Chunk Next { get; set; }
}

每個chunk默認(rèn)最多可保存256個部分。
由于每次向SocketBase寫入的Msg可能有多個部分,因此消息會寫入到數(shù)組中,所有消息寫完后指向trunk的指針才會后移一位。
YQueue有以下字段

//用于記錄當(dāng)前塊消息的個數(shù),默認(rèn)為256private readonly int m_chunkSize;// 當(dāng)隊列是空的時,下一個塊指向null,首尾塊都指向初始化的一個塊,開始位置的塊僅用于隊列的讀取(front/pop),最后位置的僅用于隊列的寫入(back/push)。// 開始位置private volatile Chunk m_beginChunk;//chunk的當(dāng)前可讀位置索引private int m_beginPositionInChunk;//指向后一個塊private Chunk m_backChunk;//chunk的最后一個可讀位置索引private int m_backPositionInChunk;//指向后一個塊private Chunk m_endChunk;//chunk的下一個可寫位置索引private int m_endPosition;//當(dāng)達到最大Msg數(shù)量時,擴展一個chunk,最大為256個塊private Chunk m_spareChunk;

當(dāng)前trunk頭部在整個隊列中的的索引位置private int m_nextGlobalIndex;

YPipe寫入Msg實際是向YQueue入隊

public void Push(ref T val){
    m_backChunk.Values[m_backPositionInChunk] = val;    //指向后一個塊
    m_backChunk = m_endChunk;    //索引更新到最后可讀位置
    m_backPositionInChunk = m_endPosition;    //下一個可寫位置向后移動一位
    m_endPosition++;    if (m_endPosition != m_chunkSize)        return;    //到達最后一個位置則需要擴充一個塊
    Chunk sc = m_spareChunk;    if (sc != m_beginChunk)
    {        //已經(jīng)擴充了塊則更新下一個塊的位置
        m_spareChunk = m_spareChunk.Next;
        m_endChunk.Next = sc;
        sc.Previous = m_endChunk;
    }    else
    {        //新建一個塊,并更新索引位置
        m_endChunk.Next = new Chunk(m_chunkSize, m_nextGlobalIndex);
        m_nextGlobalIndex += m_chunkSize;
        m_endChunk.Next.Previous = m_endCh

http://www.cnblogs.com/Jack-Blog/p/7117798.html