1概要設計
Kafka SocketServer是基于Java NIO來開發(fā)的,采用了Reactor的模式,其中包含了1個Acceptor負責接受客戶端請求,N個Processor負責讀寫數(shù)據(jù),M個Handler來處理業(yè)務邏輯。在Acceptor和Processor,Processor和Handler之間都有隊列來緩沖請求。
1.1 kafka.network.Acceptor
這個類繼承了AbstractServerThread,實現(xiàn)了Runnable接口,因此它是一個線程類。它的主要職責是監(jiān)聽客戶端的連接請求,并建立和客戶端的數(shù)據(jù)傳輸通道,然后為這個客戶端指定一個Processor,它的工作就到此結束,這樣它就可以去響應下一個客戶端的連接請求了。
它的run方法
的主要邏輯如下:
首先在ServerSocketChannel上注冊OP_ACCEPT事件:
serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
然后開始等待客戶端的連接請求:
val ready = nioSelector.select(500)
如果有連接進來,則將其分配給當前的processor,并且把當前processor指向下一個processor,也就是說它采用了Round Robin的方式來選擇processor
:if (ready > 0) { val keys = nioSelector.selectedKeys() val iter = keys.iterator() while (iter.hasNext && isRunning) { val key = iter.next iter.remove() if (key.isAcceptable) accept(key, processors(currentProcessor)) // round robin to the next processor thread currentProcessor = (currentProcessor + 1) % processors.length } }
接下來看看Acceptor的accept方法的簡化代碼(省掉了異常處理)。先說點相關的知識,
SelectionKey是表示一個Channel和Selector的注冊關系。
在Acceptor中的nioSelector,只有監(jiān)聽客戶端連接請求的ServerSocketChannel的OP_ACCEPT事件注冊在上面。當nioSelector的select方法返回時,則表示注冊在它上面的Channel發(fā)生了對應的事件。在Acceptor中,這個事件就是OP_ACCEPT,表示這個ServerSocketChannel的OP_ACCEPT事件發(fā)生了。
因此,Acceptor的accept方法的處理邏輯為:
首先通過SelectionKey來拿到對應的ServerSocketChannel,并調用其accept方法來建立和客戶端的連接,然后拿到對應的SocketChannel并交給了processor。
然后Acceptor的任務就完成了,開始去處理下一個客戶端的連接請求。Processor的accept方法的邏輯將在下一節(jié)介紹。
/* * Accept a new connection */ def accept(key: SelectionKey, processor: Processor) { val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel] val socketChannel = serverSocketChannel.accept() connectionQuotas.inc(socketChannel.socket().getInetAddress) socketChannel.configureBlocking(false) socketChannel.socket().setTcpNoDelay(true) socketChannel.socket().setKeepAlive(true) socketChannel.socket().setSendBufferSize(sendBufferSize) debug("Accepted connection from %s on %s and assigned it to processor %d, sendBufferSize [actual|requested]: [%d|%d] recvBufferSize [actual|requested]: [%d|%d]" .format(socketChannel.socket.getRemoteSocketAddress, socketChannel.socket.getLocalSocketAddress, processor.id, socketChannel.socket.getSendBufferSize, sendBufferSize, socketChannel.socket.getReceiveBufferSize, recvBufferSize)) processor.accept(socketChannel) }
1.2 kafka.network.Processor
Processor也是繼承自AbstractServerThread并實現(xiàn)Runnable接口,線程類。它負責從客戶端讀取數(shù)據(jù)和將響應返回給客戶端,不處理具體的業(yè)務邏輯。每個Processor都有一個Selector,用來監(jiān)聽多個客戶端,因此可以非阻塞地處理多個客戶端的讀寫請求。
1.2.1 處理新建立的連接
從上一節(jié)中可以看到,Acceptor會把多個客戶端的數(shù)據(jù)連接SocketChannel分配一個Processor,因此每個Processor內部都有一個隊列來保存這些新來的數(shù)據(jù)連接:private val newConnections = new ConcurrentLinkedQueue[SocketChannel]()
Processor的accpet方法(Acceptor會調用它)的代碼如下,它就把一個SocketChannel放到隊列中,然后喚醒Processor的selector。
/** * Queue up a new connection for reading */ def accept(socketChannel: SocketChannel) { newConnections.add(socketChannel) wakeup() }
需要注意的是:這個方法不是在Processor的線程里面執(zhí)行的,而是在Acceptor線程里面執(zhí)行的。
在run方法中,它首先調用方法configureNewConnections,如果有隊列中有新的SocketChannel,則它首先將其OP_READ事情注冊到該Processor的selector上面。
/** * Register any new connections that have been queued up */ private def configureNewConnections() { while (!newConnections.isEmpty) { val channel = newConnections.poll() debug(s"Processor $id listening to new connection from ${channel.socket.getRemoteSocketAddress}") val localHost = channel.socket().getLocalAddress.getHostAddress val localPort = channel.socket().getLocalPort val remoteHost = channel.socket().getInetAddress.getHostAddress val remotePort = channel.socket().getPort val connectionId = ConnectionId(localHost, localPort, remoteHost, remotePort).toString selector.register(connectionId, channel) } }
1.2.2 讀取客戶端的數(shù)據(jù)
在Processor的run方法中,它也是調用selector的select方法來監(jiān)聽客戶端的數(shù)據(jù)請求,簡化的代碼如下:
val ready = selector.select(); if(ready > 0) { val keys = selector.selectedKeys() val iter = keys.iterator() while(iter.hasNext && isRunning) { var key: SelectionKey = null key = iter.next iter.remove() if(key.isReadable) read(key) } }
從上面的邏輯中可以看到,當一個客戶端數(shù)據(jù)傳輸過來,read方法會被調用,下面是read方法的簡化代碼。
def read(key: SelectionKey) { val socketChannel = channelFor(key) var receive = key.attachment.asInstanceOf[Receive] if(key.attachment == null) { receive = new BoundedByteBufferReceive(maxRequestSize) key.attach(receive) } val read = receive.readFrom(socketChannel) if(read < 0) { close(key) } else if(receive.complete) { val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeMs = time.milliseconds, remoteAddress = address) requestChannel.sendRequest(req) key.attach(null) // explicitly reset interest ops to not READ, no need to wake up the selector just yet key.interestOps(key.interestOps & (~SelectionKey.OP_READ)) } else { // more reading to be done key.interestOps(SelectionKey.OP_READ) wakeup() } }
read方法的流程為:
首先從SelectionKey中拿到對應的SocketChannel,并且取出attach在SelectionKey上的Receive對象,如果是第一次讀取,Receive對象為null,則創(chuàng)建一個BoundedByteBufferReceive,由它來處理具體的讀數(shù)據(jù)的邏輯??梢钥吹矫總€客戶端都有一個Receive對象來讀取數(shù)據(jù)。
如果數(shù)據(jù)從客戶端讀取完畢(receive.complete),則將讀取的數(shù)據(jù)封裝成Request對象,并添加到requestChannel中去。如果沒有讀取完畢(可能是客戶端還沒有發(fā)送完或者網(wǎng)絡延遲),那么就讓selector繼續(xù)監(jiān)聽這個通道的OP_READ事件。
因此,我們知道具體讀取數(shù)據(jù)是在BoundedByteBufferReceive里面完成的,而讀取完成后要交給RequestChannel,接下來我們來看這兩部分的代碼。
1.2.3 BoundedByteBufferReceive
BoundedByteBufferReceive中有2個ByteBuffer,分別是sizeBuffer和contentBuffer,其中sizeBuffer是固定的4個字節(jié),表示這次發(fā)送來的數(shù)據(jù)總共有多大,隨后再讀取對應大小的數(shù)據(jù)放到contentBuffer中。
主要的處理邏輯都是在readFrom這個方法中,簡化的代碼如下:
def readFrom(channel: ReadableByteChannel): Int = { var read = 0 // have we read the request size yet? if(sizeBuffer.remaining > 0) read += Utils.read(channel, sizeBuffer) // have we allocated the request buffer yet? if(contentBuffer == null && !sizeBuffer.hasRemaining) { sizeBuffer.rewind() val size = sizeBuffer.getInt() contentBuffer = byteBufferAllocate(size) } // if we have a buffer read some stuff into it if(contentBuffer != null) { read = Utils.read(channel, contentBuffer) // did we get everything? if(!contentBuffer.hasRemaining) { contentBuffer.rewind() complete = true } } read }
首先檢查sizeBuffer是不是都讀滿了,沒有的話就從對應的channel中讀取數(shù)據(jù)放到sizeBuffer中,就是下面這句,它會從channel中讀取最多等同于sizeBuffer中剩下空間數(shù)量的數(shù)據(jù)。---- Utils.read(channel, sizeBuffer)
當sizeBuffer讀取完成了,就知道真正的數(shù)據(jù)有多少了,因此就是按照這個大小來分配contentBuffer了。緊接著就是從channel讀取真正的數(shù)據(jù)放到contentBuffer中,當把contentBuffer讀滿以后就停止了并把complet標記為true。因此,可以看到客戶端在發(fā)送數(shù)據(jù)的時候需要先發(fā)送這次要發(fā)送數(shù)據(jù)的大小,然后再發(fā)送對應的數(shù)據(jù)。
這樣設計是因為java NIO在從channel中讀取數(shù)據(jù)的時候只能指定讀多少,而且數(shù)據(jù)也不是一次就能全部讀取完成的,用這種方式來保證數(shù)據(jù)都讀進來了。
簡而言之,Processor通過selector來監(jiān)聽它負責的那些數(shù)據(jù)通道,當通道上有數(shù)據(jù)可讀時,它就是把這個事情交給BoundedByteBufferReceive。BoundedByteBufferReceive先讀一個int來確定數(shù)據(jù)量有多少,然后再讀取真正的數(shù)據(jù)。那數(shù)據(jù)讀取進來后又是如何被處理的呢?下一節(jié)來分析對應的代碼。
1.3 kafka.network.RequestChannel
RequestChannel是Processor和Handler交換數(shù)據(jù)的地方。它包含了一個隊列requestQueue用來存放Processor加入的Request,Handler會從里面取出Request來處理;它還為每個Processor開辟了一個respondQueue,用來存放Handler處理了Request后給客戶端的Response。下面是一些源碼:
初始化requestQueue和responseQueues的代碼:
private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize) private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors) for(i <- 0 until numProcessors) responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]()
sendRequest方法:Processor在讀取完數(shù)據(jù)后,將數(shù)據(jù)封裝成一個Request對象然后調用這個方法將Request添加到requestQueue中。如果requestQueue滿的話,這個方法會阻塞在這里直到有Handler取走一個Request。
def sendRequest(request: RequestChannel.Request) { requestQueue.put(request) }
receiveRequest方法:Handler從requestQueue中取出Request,如果隊列為空,這個方法會阻塞在這里直到有Processor加入新的Request。
def receiveRequest(): RequestChannel.Request = requestQueue.take()
類似的sendResponse和receiveResponse就寫在這里,唯一的區(qū)別就是添加和取出Response的時候要指定Processor的id因為每個Processor都有其對應的responseQueue。
1.4 返回數(shù)據(jù)給客戶端
Processor不僅負責從客戶端讀取數(shù)據(jù),還要將Handler的處理結果返回給客戶端。在Processor的run方法(Processor是一個線程類),它會調用processNewResponses()來處理Handler的提供給客戶端的Response。簡化的代碼如下:
private def processNewResponses() { var curr = requestChannel.receiveResponse(id) while(curr != null) { val key = curr.request.requestKey.asInstanceOf[SelectionKey] curr.responseAction match { case RequestChannel.SendAction => { key.interestOps(SelectionKey.OP_WRITE) key.attach(curr) } } curr = requestChannel.receiveResponse(id) } }
它依次把requestChannel中responseQueue的Response取出來,然后將對應通道的OP_WRITE事件注冊到selector上。這和上面的configureNewConnections很類似。
然后當selector的select方法返回時,檢查是否有通道是WRITEABLE,如果有則調用Processor中的write方法。在write方法中,Processor又將具體寫數(shù)據(jù)的任務交給了Response中的Send對象。這和讀取數(shù)據(jù)的處理方式非常類似,就不細說了。
到此為止,我們分析了Processor是如何從客戶端讀取數(shù)據(jù)的,以及如何將Handler處理后的響應返回給客戶端。下一節(jié)將簡要分析一下Handler。
1.5 kafka.server.KafkaRequestHandler
Handler的職責是從requestChannel中的requestQueue取出Request,處理以后再將Response添加到requestChannel中的responseQueue中。
因為Handler是處理具體業(yè)務的,所以它可以有不同的實現(xiàn),或者把具體的處理再外包出去。我們就簡要看一下KafkaRequestHandler是如何做的。
KafkaRequestHandler實現(xiàn)了Runnable,因此是個線程類,除去錯誤處理的代碼后,其run方法可以簡化為如下代碼,它把所有的處理邏輯都交給了KafkaApis:
def run() { while(true) { var req : RequestChannel.Request = requestChannel.receiveRequest(300) apis.handle(req) } }
http://www.cnblogs.com/byrhuangqiang/p/6368008.html