閱讀目錄

由表及里

HTTP服務器用于響應來自客戶端的請求,當客戶端請求數(shù)逐漸增大時服務端的處理機制有多種,如tomcat的多線程、nginx的事件循環(huán)等。而對于node而言,由于其也采用事件循環(huán)和異步I/O機制,因此在高I/O并發(fā)的場景下性能非常好,但是由于單個node程序僅僅利用單核cpu,因此為了更好利用系統(tǒng)資源就需要fork多個node進程執(zhí)行HTTP服務器邏輯,所以node內(nèi)建模塊提供了child_process和cluster模塊。利用child_process模塊,我們可以執(zhí)行shell命令,可以fork子進程執(zhí)行代碼,也可以直接執(zhí)行二進制文件;利用cluster模塊,使用node封裝好的API、IPC通道和調(diào)度機可以非常簡單的創(chuàng)建包括一個master進程下HTTP代理服務器 + 多個worker進程多個HTTP應用服務器的架構,并提供兩種調(diào)度子進程算法。本文主要針對cluster模塊講述node是如何實現(xiàn)簡介高效的服務集群創(chuàng)建和調(diào)度的。那么就從代碼進入本文的主題:

code1

const cluster = require('cluster');const http = require('http');if (cluster.isMaster) {  let numReqs = 0;
  setInterval(() => {    console.log(`numReqs = ${numReqs}`);
  }, 1000);  function messageHandler(msg) {    if (msg.cmd && msg.cmd === 'notifyRequest') {
      numReqs += 1;
    }
  }  const numCPUs = require('os').cpus().length;  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }  for (const id in cluster.workers) {
    cluster.workers[id].on('message', messageHandler);
  }

} else {  // Worker processes have a http server.
  http.Server((req, res) => {
    res.writeHead(200);
    res.end('hello world\n');

    process.send({ cmd: 'notifyRequest' });
  }).listen(8000);
}

主進程創(chuàng)建多個子進程,同時接受子進程傳來的消息,循環(huán)輸出處理請求的數(shù)量;

子進程創(chuàng)建http服務器,偵聽8000端口并返回響應。

泛泛的大道理誰都了解,可是這套代碼如何運行在主進程和子進程中呢?父進程如何向子進程傳遞客戶端的請求?多個子進程共同偵聽8000端口,會不會造成端口reuse error?每個服務器進程最大可有效支持多少并發(fā)量?主進程下的代理服務器如何調(diào)度請求? 這些問題,如果不深入進去便永遠只停留在寫應用代碼的層面,而且不了解cluster集群創(chuàng)建的多進程與使用child_process創(chuàng)建的進程集群的區(qū)別,也寫不出符合業(yè)務的最優(yōu)代碼,因此,深入cluster還是有必要的。

cluster與net

cluster模塊與net模塊息息相關,而net模塊又和底層socket有聯(lián)系,至于socket則涉及到了系統(tǒng)內(nèi)核,這樣便由表及里的了解了node對底層的一些優(yōu)化配置,這是我們的思路。介紹前,筆者仔細研讀了node的js層模塊實現(xiàn),在基于自身理解的基礎上詮釋上節(jié)代碼的實現(xiàn)流程,力圖做到清晰、易懂,如果有某些紕漏也歡迎讀者指出,只有在互相交流中才能收獲更多。

一套代碼,多次執(zhí)行

很多人對code1代碼如何在主進程和子進程執(zhí)行感到疑惑,怎樣通過cluster.isMaster判斷語句內(nèi)的代碼是在主進程執(zhí)行,而其他代碼在子進程執(zhí)行呢?

其實只要你深入到了node源碼層面,這個問題很容易作答。cluster模塊的代碼只有一句:

module.exports = ('NODE_UNIQUE_ID' in process.env) ?                  require('internal/cluster/child') :                  require('internal/cluster/master');

只需要判斷當前進程有沒有環(huán)境變量“NODE_UNIQUE_ID”就可知道當前進程是否是主進程;而變量“NODE_UNIQUE_ID”則是在主進程fork子進程時傳遞進去的參數(shù),因此采用cluster.fork創(chuàng)建的子進程是一定包含“NODE_UNIQUE_ID”的。

這里需要指出的是,必須通過cluster.fork創(chuàng)建的子進程才有NODE_UNIQUE_ID變量,如果通過child_process.fork的子進程,在不傳遞環(huán)境變量的情況下是沒有NODE_UNIQUE_ID的。因此,當你在child_process.fork的子進程中執(zhí)行cluster.isMaster判斷時,返回 true。

主進程與服務器

code1中,并沒有在cluster.isMaster的條件語句中創(chuàng)建服務器,也沒有提供服務器相關的路徑、端口和fd,那么主進程中是否存在TCP服務器,有的話到底是什么時候怎么創(chuàng)建的?

相信大家在學習nodejs時閱讀的各種書籍都介紹過在集群模式下,主進程的服務器會接受到請求然后發(fā)送給子進程,那么問題就來到主進程的服務器到底是如何創(chuàng)建呢?主進程服務器的創(chuàng)建離不開與子進程的交互,畢竟與創(chuàng)建服務器相關的信息全在子進程的代碼中。

當子進程執(zhí)行

http.Server((req, res) => {
    res.writeHead(200);
    res.end('hello world\n');

    process.send({ cmd: 'notifyRequest' });
  }).listen(8000);

時,http模塊會調(diào)用net模塊(確切的說,http.Server繼承net.Server),創(chuàng)建net.Server對象,同時偵聽端口。創(chuàng)建net.Server實例,調(diào)用構造函數(shù)返回。創(chuàng)建的net.Server實例調(diào)用listen(8000),等待accpet連接。那么,子進程如何傳遞服務器相關信息給主進程呢?答案就在listen函數(shù)中。我保證,net.Server.prototype.listen函數(shù)絕沒有表面上看起來的那么簡單,它涉及到了許多IPC通信和兼容性處理,可以說HTTP服務器創(chuàng)建的所有邏輯都在listen函數(shù)中。

延伸下,在學習linux下的socket編程時,服務端的邏輯依次是執(zhí)行socket(),bind(),listen()和accept(),在接收到客戶端連接時執(zhí)行read(),write()調(diào)用完成TCP層的通信。那么,對應到node的net模塊好像只有listen()階段,這是不是很難對應socket的四個階段呢?其實不然,node的net模塊把“bind,listen”操作全部寫入了net.Server.prototype.listen中,清晰的對應底層socket和TCP三次握手,而向上層使用者只暴露簡單的listen接口。

code2

Server.prototype.listen = function() {

  ...  // 根據(jù)參數(shù)創(chuàng)建 handle句柄
  options = options._handle || options.handle || options;  // (handle[, backlog][, cb]) where handle is an object with a handle
  if (options instanceof TCP) {    this._handle = options;    this[async_id_symbol] = this._handle.getAsyncId();
    listenInCluster(this, null, -1, -1, backlogFromArgs);    return this;
  }

  ...  var backlog;  if (typeof options.port === 'number' || typeof options.port === 'string') {    if (!isLegalPort(options.port)) {      throw new RangeError('"port" argument must be >= 0 and < 65536');
    }
    backlog = options.backlog || backlogFromArgs;    // start TCP server listening on host:port
    if (options.host) {
      lookupAndListen(this, options.port | 0, options.host, backlog,
                      options.exclusive);
    } else { // Undefined host, listens on unspecified address
      // Default addressType 4 will be used to search for master server
      listenInCluster(this, null, options.port | 0, 4,
                      backlog, undefined, options.exclusive);
    }    return this;
  }

  ...  throw new Error('Invalid listen argument: ' + util.inspect(options));
};

由于本文只探究cluster模式下HTTP服務器的相關內(nèi)容,因此我們只關注有關TCP服務器部分,其他的Pipe(domain socket)服務不考慮。

listen函數(shù)可以偵聽端口、路徑和指定的fd,因此在listen函數(shù)的實現(xiàn)中判斷各種參數(shù)的情況,我們最為關心的就是偵聽端口的情況,在成功進入條件語句后發(fā)現(xiàn)所有的情況最后都執(zhí)行了listenInCluster函數(shù)而返回,因此有必要繼續(xù)探究。

code3

function listenInCluster(server, address, port, addressType,
                         backlog, fd, exclusive) {

  ...  if (cluster.isMaster || exclusive) {
    server._listen2(address, port, addressType, backlog, fd);    return;
  }  // 后續(xù)代碼為worker執(zhí)行邏輯
  const serverQuery = {
    address: address,
    port: port,
    addressType: addressType,
    fd: fd,
    flags: 0
  };

  ... 

  cluster._getServer(server, serverQuery, listenOnMasterHandle);
}

listenInCluster函數(shù)傳入了各種參數(shù),如server實例、ip、port、ip類型(IPv6和IPv4)、backlog(底層服務端socket處理請求的最大隊列)、fd等,它們不是必須傳入,比如創(chuàng)建一個TCP服務器,就僅僅需要一個port即可。

簡化后的listenInCluster函數(shù)很簡單,cluster模塊判斷當前進程為主進程時,執(zhí)行_listen2函數(shù);否則,在子進程中執(zhí)行cluster._getServer函數(shù),同時像函數(shù)傳遞serverQuery對象,即創(chuàng)建服務器需要的相關信息。

因此,我們可以大膽假設,子進程在cluster._getServer函數(shù)中向主進程發(fā)送了創(chuàng)建服務器所需要的數(shù)據(jù),即serverQuery。實際上也確實如此:

code4

cluster._getServer = function(obj, options, cb) {  const message = util._extend({
    act: 'queryServer',
    index: indexes[indexesKey],
    data: null
  }, options);

  send(message, function modifyHandle(reply, handle) => {    if (typeof obj._setServerData === 'function')
      obj._setServerData(reply.data);    if (handle)
      shared(reply, handle, indexesKey, cb);  // Shared listen socket.
    else
      rr(reply, indexesKey, cb);              // Round-robin.
  });

};

子進程在該函數(shù)中向已建立的IPC通道發(fā)送內(nèi)部消息message,該消息包含之前提到的serverQuery信息,同時包含act: 'queryServer'字段,等待服務端響應后繼續(xù)執(zhí)行回調(diào)函數(shù)modifyHandle。

主進程接收到子進程發(fā)送的內(nèi)部消息,會根據(jù)act: 'queryServer'執(zhí)行對應queryServer方法,完成服務器的創(chuàng)建,同時發(fā)送回復消息給子進程,子進程執(zhí)行回調(diào)函數(shù)modifyHandle,繼續(xù)接下來的操作。

至此,針對主進程在cluster模式下如何創(chuàng)建服務器的流程已完全走通,主要的邏輯是在子進程服務器的listen過程中實現(xiàn)。

net模塊與socket

上節(jié)提到了node中創(chuàng)建服務器無法與socket創(chuàng)建對應的問題,本節(jié)就該問題做進一步解釋。在net.Server.prototype.listen函數(shù)中調(diào)用了listenInCluster函數(shù),listenInCluster會在主進程或者子進程的回調(diào)函數(shù)中調(diào)用_listen2函數(shù),對應底層服務端socket建立階段的正是在這里。

function setupListenHandle(address, port, addressType, backlog, fd) {  // worker進程中,_handle為fake對象,無需創(chuàng)建
  if (this._handle) {
    debug('setupListenHandle: have a handle already');
  } else {
    debug('setupListenHandle: create a handle');    if (rval === null)
      rval = createServerHandle(address, port, addressType, fd);    this._handle = rval;
  }  this[async_id_symbol] = getNewAsyncId(this._handle);  this._handle.onconnection = onconnection;  var err = this._handle.listen(backlog || 511);

}

通過createServerHandle函數(shù)創(chuàng)建句柄(句柄可理解為用戶空間的socket),同時給屬性onconnection賦值,最后偵聽端口,設定backlog。

那么,socket處理請求過程“socket(),bind()”步驟就是在createServerHandle完成。

function createServerHandle(address, port, addressType, fd) {  var handle;  // 針對網(wǎng)絡連接,綁定地址
  if (address || port || isTCP) {    if (!address) {
      err = handle.bind6('::', port);      if (err) {
        handle.close();        return createServerHandle('0.0.0.0', port);
      }
    } else if (addressType === 6) {
      err = handle.bind6(address, port);
    } else {
      err = handle.bind(address, port);
    }
  }  return handle;
}

在createServerHandle中,我們看到了如何創(chuàng)建socket(createServerHandle在底層利用node自己封裝的類庫創(chuàng)建TCP handle),也看到了bind綁定ip和地址,那么node的net模塊如何接收客戶端請求呢?

必須深入c++模塊才能了解node是如何實現(xiàn)在c++層面調(diào)用js層設置的onconnection回調(diào)屬性,v8引擎提供了c++和js層的類型轉(zhuǎn)換和接口透出,在c++的tcp_wrap中:

void TCPWrap::Listen(const FunctionCallbackInfo<Value>& args) {
  TCPWrap* wrap;
  ASSIGN_OR_RETURN_UNWRAP(&wrap,
                          args.Holder(),
                          args.GetReturnValue().Set(UV_EBADF));  int backloxxg = args[0]->Int32Value();  int err = uv_listen(reinterpret_cast<uv_stream_t*>(&wrap->handle_),
                      backlog,
                      OnConnection);
  args.GetReturnValue().Set(err);
}

我們關注uv_listen函數(shù),它是libuv封裝后的函數(shù),傳入了handle_,backlog和OnConnection回調(diào)函數(shù),其中handle_為node調(diào)用libuv接口創(chuàng)建的socket封裝,OnConnection函數(shù)為socket接收客戶端連接時執(zhí)行的操作。我們可能會猜測在js層設置的onconnction函數(shù)最終會在OnConnection中調(diào)用,于是進一步深入探查node的connection_wrap c++模塊:

template <typename WrapType, typename UVType>void ConnectionWrap<WrapType, UVType>::OnConnection(uv_stream_t* handle,                                                    int status) {  if (status == 0) {    if (uv_accept(handle, client_handle))      return;    // Successful accept. Call the onconnection callback in JavaScript land.
    argv[1] = client_obj;
  }
  wrap_data->MakeCallback(env->onconnection_string(), arraysize(argv), argv);
}

過濾掉多余信息便于分析。當新的客戶端連接到來時,libuv調(diào)用OnConnection,在該函數(shù)內(nèi)執(zhí)行uv_accept接收連接,最后將js層的回調(diào)函數(shù)onconnection[通過env->onconnection_string()獲取js的回調(diào)]和接收到的客戶端socket封裝傳入MakeCallback中。其中,argv數(shù)組的第一項為錯誤信息,第二項為已連接的clientSocket封裝,最后在MakeCallback中執(zhí)行js層的onconnection函數(shù),該函數(shù)的參數(shù)正是argv數(shù)組傳入的數(shù)據(jù),“錯誤代碼和clientSocket封裝”。

js層的onconnection回調(diào)

function onconnection(err, clientHandle) {  var handle = this;  if (err) {    self.emit('error', errnoException(err, 'accept'));    return;
  }  var socket = new Socket({
    handle: clientHandle,
    allowHalfOpen: self.allowHalfOpen,
    pauseOnCreate: self.pauseOnConnect
  });
  socket.readable = socket.writable = true;  self.emit('connection', socket);
}

這樣,node在C++層調(diào)用js層的onconnection函數(shù),構建node層的socket對象,并觸發(fā)connection事件,完成底層socket與node net模塊的連接與請求打通。

至此,我們打通了socket連接建立過程與net模塊(js層)的流程的交互,這種封裝讓開發(fā)者在不需要查閱底層接口和數(shù)據(jù)結(jié)構的情況下,僅使用node提供的http模塊就可以快速開發(fā)一個應用服務器,將目光聚集在業(yè)務邏輯中。

backlog是已連接但未進行accept處理的socket隊列大小。在linux 2.2以前,backlog大小包括了半連接狀態(tài)和全連接狀態(tài)兩種隊列大小。linux 2.2以后,分離為兩個backlog來分別限制半連接SYN_RCVD狀態(tài)的未完成連接隊列大小跟全連接ESTABLISHED狀態(tài)的已完成連接隊列大小。這里的半連接狀態(tài),即在三次握手中,服務端接收到客戶端SYN報文后并發(fā)送SYN+ACK報文后的狀態(tài),此時服務端等待客戶端的ACK,全連接狀態(tài)即服務端和客戶端完成三次握手后的狀態(tài)。backlog并非越大越好,當?shù)却齛ccept隊列過長,服務端無法及時處理排隊的socket,

http://www.cnblogs.com/accordion/p/7207740.html