1概要設計

Kafka SocketServer是基于Java NIO來開發(fā)的,采用了Reactor的模式,其中包含了1個Acceptor負責接受客戶端請求,N個Processor負責讀寫數(shù)據(jù),M個Handler來處理業(yè)務邏輯。在Acceptor和Processor,Processor和Handler之間都有隊列來緩沖請求。

1.1 kafka.network.Acceptor

這個類繼承了AbstractServerThread,實現(xiàn)了Runnable接口,因此它是一個線程類。它的主要職責是監(jiān)聽客戶端的連接請求,并建立和客戶端的數(shù)據(jù)傳輸通道,然后為這個客戶端指定一個Processor,它的工作就到此結束,這樣它就可以去響應下一個客戶端的連接請求了。

它的run方法的主要邏輯如下:

  1. 首先在ServerSocketChannel上注冊OP_ACCEPT事件:serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)

  2. 然后開始等待客戶端的連接請求:val ready = nioSelector.select(500)

  3. 如果有連接進來,則將其分配給當前的processor,并且把當前processor指向下一個processor,也就是說它采用了Round Robin的方式來選擇processor
    :

     if (ready > 0) {        val keys = nioSelector.selectedKeys()        val iter = keys.iterator()        while (iter.hasNext && isRunning) {            val key = iter.next
                iter.remove()            if (key.isAcceptable)              accept(key, processors(currentProcessor))            // round robin to the next processor thread
                currentProcessor = (currentProcessor + 1) % processors.length
            }
          }

接下來看看Acceptor的accept方法的簡化代碼(省掉了異常處理)。先說點相關的知識,

  • SelectionKey是表示一個Channel和Selector的注冊關系。

  • 在Acceptor中的nioSelector,只有監(jiān)聽客戶端連接請求的ServerSocketChannel的OP_ACCEPT事件注冊在上面。當nioSelector的select方法返回時,則表示注冊在它上面的Channel發(fā)生了對應的事件。在Acceptor中,這個事件就是OP_ACCEPT,表示這個ServerSocketChannel的OP_ACCEPT事件發(fā)生了。

因此,Acceptor的accept方法的處理邏輯為:

  • 首先通過SelectionKey來拿到對應的ServerSocketChannel,并調用其accept方法來建立和客戶端的連接,然后拿到對應的SocketChannel并交給了processor。

  • 然后Acceptor的任務就完成了,開始去處理下一個客戶端的連接請求。Processor的accept方法的邏輯將在下一節(jié)介紹。

/*   * Accept a new connection   */
  def accept(key: SelectionKey, processor: Processor) {    val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]    val socketChannel = serverSocketChannel.accept()
      connectionQuotas.inc(socketChannel.socket().getInetAddress)
      socketChannel.configureBlocking(false)
      socketChannel.socket().setTcpNoDelay(true)
      socketChannel.socket().setKeepAlive(true)
      socketChannel.socket().setSendBufferSize(sendBufferSize)      debug("Accepted connection from %s on %s and assigned it to processor %d, sendBufferSize [actual|requested]: [%d|%d] recvBufferSize [actual|requested]: [%d|%d]"
            .format(socketChannel.socket.getRemoteSocketAddress, socketChannel.socket.getLocalSocketAddress, processor.id,
                  socketChannel.socket.getSendBufferSize, sendBufferSize,
                  socketChannel.socket.getReceiveBufferSize, recvBufferSize))

      processor.accept(socketChannel)
    
  }

1.2 kafka.network.Processor

Processor也是繼承自AbstractServerThread并實現(xiàn)Runnable接口,線程類。它負責從客戶端讀取數(shù)據(jù)和將響應返回給客戶端,不處理具體的業(yè)務邏輯。每個Processor都有一個Selector,用來監(jiān)聽多個客戶端,因此可以非阻塞地處理多個客戶端的讀寫請求。

1.2.1 處理新建立的連接

從上一節(jié)中可以看到,Acceptor會把多個客戶端的數(shù)據(jù)連接SocketChannel分配一個Processor,因此每個Processor內部都有一個隊列來保存這些新來的數(shù)據(jù)連接:private val newConnections = new ConcurrentLinkedQueue[SocketChannel]()

Processor的accpet方法(Acceptor會調用它)的代碼如下,它就把一個SocketChannel放到隊列中,然后喚醒Processor的selector。

 /**   * Queue up a new connection for reading   */
  def accept(socketChannel: SocketChannel) {
    newConnections.add(socketChannel)    wakeup()
  }

需要注意的是:這個方法不是在Processor的線程里面執(zhí)行的,而是在Acceptor線程里面執(zhí)行的。

在run方法中,它首先調用方法configureNewConnections,如果有隊列中有新的SocketChannel,則它首先將其OP_READ事情注冊到該Processor的selector上面。

/**   * Register any new connections that have been queued up   */
  private def configureNewConnections() {    while (!newConnections.isEmpty) {      val channel = newConnections.poll()        debug(s"Processor $id listening to new connection from ${channel.socket.getRemoteSocketAddress}")        val localHost = channel.socket().getLocalAddress.getHostAddress
        val localPort = channel.socket().getLocalPort
        val remoteHost = channel.socket().getInetAddress.getHostAddress
        val remotePort = channel.socket().getPort
        val connectionId = ConnectionId(localHost, localPort, remoteHost, remotePort).toString
        selector.register(connectionId, channel)
    }
  }

1.2.2 讀取客戶端的數(shù)據(jù)

在Processor的run方法中,它也是調用selector的select方法來監(jiān)聽客戶端的數(shù)據(jù)請求,簡化的代碼如下:

 val ready = selector.select();  if(ready > 0) {    val keys = selector.selectedKeys()    val iter = keys.iterator()    while(iter.hasNext && isRunning) {      var key: SelectionKey = null
      key = iter.next
      iter.remove()      if(key.isReadable)        read(key)
    }
  }

從上面的邏輯中可以看到,當一個客戶端數(shù)據(jù)傳輸過來,read方法會被調用,下面是read方法的簡化代碼。

def read(key: SelectionKey) {    val socketChannel = channelFor(key)    var receive = key.attachment.asInstanceOf[Receive]    if(key.attachment == null) {
      receive = new BoundedByteBufferReceive(maxRequestSize)
      key.attach(receive)
    }    val read = receive.readFrom(socketChannel)    if(read < 0) {      close(key)
    } else if(receive.complete) {      val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeMs = time.milliseconds, remoteAddress = address)
      requestChannel.sendRequest(req)
      key.attach(null)      // explicitly reset interest ops to not READ, no need to wake up the selector just yet
      key.interestOps(key.interestOps & (~SelectionKey.OP_READ))
    } else {      // more reading to be done
      key.interestOps(SelectionKey.OP_READ)      wakeup()
    }
  }

read方法的流程為:

  1. 首先從SelectionKey中拿到對應的SocketChannel,并且取出attach在SelectionKey上的Receive對象,如果是第一次讀取,Receive對象為null,則創(chuàng)建一個BoundedByteBufferReceive,由它來處理具體的讀數(shù)據(jù)的邏輯??梢钥吹矫總€客戶端都有一個Receive對象來讀取數(shù)據(jù)。

  2. 如果數(shù)據(jù)從客戶端讀取完畢(receive.complete),則將讀取的數(shù)據(jù)封裝成Request對象,并添加到requestChannel中去。如果沒有讀取完畢(可能是客戶端還沒有發(fā)送完或者網(wǎng)絡延遲),那么就讓selector繼續(xù)監(jiān)聽這個通道的OP_READ事件。

因此,我們知道具體讀取數(shù)據(jù)是在BoundedByteBufferReceive里面完成的,而讀取完成后要交給RequestChannel,接下來我們來看這兩部分的代碼。

1.2.3 BoundedByteBufferReceive

BoundedByteBufferReceive中有2個ByteBuffer,分別是sizeBuffer和contentBuffer,其中sizeBuffer是固定的4個字節(jié),表示這次發(fā)送來的數(shù)據(jù)總共有多大,隨后再讀取對應大小的數(shù)據(jù)放到contentBuffer中。

主要的處理邏輯都是在readFrom這個方法中,簡化的代碼如下:

def readFrom(channel: ReadableByteChannel): Int = {    var read = 0
    // have we read the request size yet?
    if(sizeBuffer.remaining > 0)
      read += Utils.read(channel, sizeBuffer)    // have we allocated the request buffer yet?
    if(contentBuffer == null && !sizeBuffer.hasRemaining) {
      sizeBuffer.rewind()      val size = sizeBuffer.getInt()
      contentBuffer = byteBufferAllocate(size)
    }    // if we have a buffer read some stuff into it
    if(contentBuffer != null) {
      read = Utils.read(channel, contentBuffer)      // did we get everything?
      if(!contentBuffer.hasRemaining) {
        contentBuffer.rewind()
        complete = true
      }
    }
    read
  }

首先檢查sizeBuffer是不是都讀滿了,沒有的話就從對應的channel中讀取數(shù)據(jù)放到sizeBuffer中,就是下面這句,它會從channel中讀取最多等同于sizeBuffer中剩下空間數(shù)量的數(shù)據(jù)。---- Utils.read(channel, sizeBuffer)

當sizeBuffer讀取完成了,就知道真正的數(shù)據(jù)有多少了,因此就是按照這個大小來分配contentBuffer了。緊接著就是從channel讀取真正的數(shù)據(jù)放到contentBuffer中,當把contentBuffer讀滿以后就停止了并把complet標記為true。因此,可以看到客戶端在發(fā)送數(shù)據(jù)的時候需要先發(fā)送這次要發(fā)送數(shù)據(jù)的大小,然后再發(fā)送對應的數(shù)據(jù)。

這樣設計是因為java NIO在從channel中讀取數(shù)據(jù)的時候只能指定讀多少,而且數(shù)據(jù)也不是一次就能全部讀取完成的,用這種方式來保證數(shù)據(jù)都讀進來了。

簡而言之,Processor通過selector來監(jiān)聽它負責的那些數(shù)據(jù)通道,當通道上有數(shù)據(jù)可讀時,它就是把這個事情交給BoundedByteBufferReceive。BoundedByteBufferReceive先讀一個int來確定數(shù)據(jù)量有多少,然后再讀取真正的數(shù)據(jù)。那數(shù)據(jù)讀取進來后又是如何被處理的呢?下一節(jié)來分析對應的代碼。

1.3 kafka.network.RequestChannel

RequestChannel是Processor和Handler交換數(shù)據(jù)的地方。它包含了一個隊列requestQueue用來存放Processor加入的Request,Handler會從里面取出Request來處理;它還為每個Processor開辟了一個respondQueue,用來存放Handler處理了Request后給客戶端的Response。下面是一些源碼:

初始化requestQueue和responseQueues的代碼:

private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)  private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)  for(i <- 0 until numProcessors)    responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]()

sendRequest方法:Processor在讀取完數(shù)據(jù)后,將數(shù)據(jù)封裝成一個Request對象然后調用這個方法將Request添加到requestQueue中。如果requestQueue滿的話,這個方法會阻塞在這里直到有Handler取走一個Request。

  def sendRequest(request: RequestChannel.Request) {
    requestQueue.put(request)
  }

receiveRequest方法:Handler從requestQueue中取出Request,如果隊列為空,這個方法會阻塞在這里直到有Processor加入新的Request。

  def receiveRequest(): RequestChannel.Request =
    requestQueue.take()

類似的sendResponse和receiveResponse就寫在這里,唯一的區(qū)別就是添加和取出Response的時候要指定Processor的id因為每個Processor都有其對應的responseQueue。

1.4 返回數(shù)據(jù)給客戶端

Processor不僅負責從客戶端讀取數(shù)據(jù),還要將Handler的處理結果返回給客戶端。在Processor的run方法(Processor是一個線程類),它會調用processNewResponses()來處理Handler的提供給客戶端的Response。簡化的代碼如下:

private def processNewResponses() {    var curr = requestChannel.receiveResponse(id)    while(curr != null) {      val key = curr.request.requestKey.asInstanceOf[SelectionKey]
      curr.responseAction match {        case RequestChannel.SendAction => {
          key.interestOps(SelectionKey.OP_WRITE)
          key.attach(curr)
        }
      }
    curr = requestChannel.receiveResponse(id)
    }
  }

它依次把requestChannel中responseQueue的Response取出來,然后將對應通道的OP_WRITE事件注冊到selector上。這和上面的configureNewConnections很類似。

然后當selector的select方法返回時,檢查是否有通道是WRITEABLE,如果有則調用Processor中的write方法。在write方法中,Processor又將具體寫數(shù)據(jù)的任務交給了Response中的Send對象。這和讀取數(shù)據(jù)的處理方式非常類似,就不細說了。

到此為止,我們分析了Processor是如何從客戶端讀取數(shù)據(jù)的,以及如何將Handler處理后的響應返回給客戶端。下一節(jié)將簡要分析一下Handler。

1.5 kafka.server.KafkaRequestHandler

Handler的職責是從requestChannel中的requestQueue取出Request,處理以后再將Response添加到requestChannel中的responseQueue中。

因為Handler是處理具體業(yè)務的,所以它可以有不同的實現(xiàn),或者把具體的處理再外包出去。我們就簡要看一下KafkaRequestHandler是如何做的。

KafkaRequestHandler實現(xiàn)了Runnable,因此是個線程類,除去錯誤處理的代碼后,其run方法可以簡化為如下代碼,它把所有的處理邏輯都交給了KafkaApis:

 def run() {    while(true) {      var req : RequestChannel.Request = requestChannel.receiveRequest(300)
      apis.handle(req)
    }
  }

http://www.cnblogs.com/byrhuangqiang/p/6368008.html