本文我們將先從NioEventLoop開始來學習服務端的處理流程。話不多說,開始學習~~~~
我們從上文中已經(jīng)知道server在啟動的時候會開啟兩個線程:bossGroup和workerGroup,這兩個線程分別是boss線程池(用于接收client請求)和worker線程池(用于處理具體的讀寫操作),這兩個線程調(diào)度器都是NioEventLoopGroup,bossGroup有一個NioEventLoop,而worker線程池有n*cup數(shù)量個NioEventLoop。那么我們看看在NioEventLoop中的是如何開始的:
NioEventLoop本質(zhì)上是一個線程調(diào)度器(繼承自ScheduledExecutorService),當bind之后就開始run起一個線程:
(代碼一)
1 @Override 2 protected void run() { 3 for (;;) { 4 boolean oldWakenUp = wakenUp.getAndSet(false); 5 try { 6 if (hasTasks()) { 7 selectNow(); 8 } else { 9 select(oldWakenUp);10 11 if (wakenUp.get()) {12 selector.wakeup();13 }14 }15 16 cancelledKeys = 0;17 needsToSelectAgain = false;18 final int ioRatio = this.ioRatio;19 if (ioRatio == 100) {20 processSelectedKeys();21 runAllTasks();22 } else {23 final long ioStartTime = System.nanoTime();24 25 processSelectedKeys();26 27 final long ioTime = System.nanoTime() - ioStartTime;28 runAllTasks(ioTime * (100 - ioRatio) / ioRatio);29 }30 31 if (isShuttingDown()) {32 closeAll();33 if (confirmShutdown()) {34 break;35 }36 }37 } catch (Throwable t) {38 ...39 }40 }41 }
這個for(;;)里面就是boss線程的核心處理流程:
【代碼一主線】1,不斷地監(jiān)聽selector拿到socket句柄然后創(chuàng)建channel。每次run的時候先拿到wakeup的值,并且set進去false(PS:wakeup是什么鬼?一個AtomicBoolean,代表是否用戶喚醒,如果不人為將其set成true,永遠是false)。
【代碼一主線】2,如果任務隊列中已有任務,那么selectNow(),(PS:selectNow是什么鬼?我們知道selector.select()是一個阻塞調(diào)用,而selectNow方法是個非阻塞方法,如果沒有到達的socket句柄則返回0),因此若隊列中已有任務的話應該立即開始執(zhí)行,而不能阻塞到selector.select()上,否則則調(diào)用select()方法,繼續(xù)看select()里面:
(代碼二)
1 private void select(boolean oldWakenUp) throws IOException { 2 Selector selector = this.selector; 3 try { 4 int selectCnt = 0; 5 long currentTimeNanos = System.nanoTime(); 6 long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); 7 for (;;) { 8 long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; 9 if (timeoutMillis <= 0) {10 if (selectCnt == 0) {11 selector.selectNow();12 selectCnt = 1;13 }14 break;15 }16 17 int selectedKeys = selector.select(timeoutMillis);18 selectCnt ++;19 20 if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {21 // 如果selectedKeys不為空、或者被用戶喚醒、或者隊列中有待處理任務、或者調(diào)度器中有任務,則break22 break;23 }24 if (Thread.interrupted()) {25 //如果線程被中斷則重置selectedKeys,同時break出本次循環(huán),所以不會陷入一個繁忙的循環(huán)。26 selectCnt = 1;27 break;28 }29 30 long time = System.nanoTime();31 if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {32 // selector超時33 selectCnt = 1;34 } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {35 // selector多次過早返回,重新建立并打開Selector36 ...37 }38 39 currentTimeNanos = time;40 }41 ...42 } catch (CancelledKeyException e) {43 ...44 }45 }
我們看到,select()方法進入一個for循環(huán)去select阻塞等待socket(這里的selector的實現(xiàn)在是根據(jù)操作系統(tǒng)和netty的版本來定的,在最新的netty中是使用的linux的epoll模型),同時入?yún)⒗镉小俺瑫r時間”,如果超過了這個時間仍然沒有socket到來則重新將selectCnt置為1重新循環(huán)等待,直到有socket到來。如果selectedKeys不為空、或者被用戶喚醒、或者隊列中有待處理任務、或者調(diào)度器中有任務,那么就是說該eventLoop有活干了,先break出去去干活,完了再打開selector重新阻塞等待。正常情況下會等待到一個socket,break出去之后回到代碼一
【代碼一主線】3,根據(jù)ioRatio來選擇任務執(zhí)行策略(PS:ioRatio是什么鬼?看了下用途應該是這樣的,這個ioRatio代表該eventLoop期望在I/O操作上花費時間的比例)。而NioEventLoop中有兩類操作,一類是I/O操作(讀寫之類),調(diào)用processSelectedKeys;一類是非I/O操作(例如register等),調(diào)用runAllTasks。如果ioRatio是100的話那么會按照順序執(zhí)行I/O操作->非I/O操作;如果不是會按照這個比例算出一個超時時間,在run任務隊列的時候如果超過了這個時間會立即返回,確保I/O操作可以得到及時的調(diào)用。
我們關心的是I/O操作,那么進入processSelectedKeys()看下發(fā)生了什么吧。
(代碼三)
1 private void processSelectedKeys() {2 if (selectedKeys != null) {3 processSelectedKeysOptimized(selectedKeys.flip());4 } else {5 processSelectedKeysPlain(selector.selectedKeys());6 }7 }
正常情況下會走到processSelectedKeysOptimized中:
(代碼四)
1 private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) { 2 for (int i = 0;; i ++) { 3 final SelectionKey k = selectedKeys[i]; 4 if (k == null) { 5 break; 6 } 7 selectedKeys[i] = null; 8 9 final Object a = k.attachment();10 11 if (a instanceof AbstractNioChannel) {12 processSelectedKey(k, (AbstractNioChannel) a);13 } else {14 @SuppressWarnings("unchecked")15 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;16 processSelectedKey(k, task);17 }18 19 if (needsToSelectAgain) {20 for (;;) {21 if (selectedKeys[i] == null) {22 break;23 }24 selectedKeys[i] = null;25 i++;26 }27 28 selectAgain();29 selectedKeys = this.selectedKeys.flip();30 i = -1;31 }32 }33 }
遍歷拿到所有的SelectionKey,然后判斷每個SelectionKey的attachment,上篇文章中已經(jīng)分析過給ServerBootstrap注冊的Channel是NioServerSocketChannel(繼承自AbstractNioChannel),因此進入processSelectedKey中:
(代碼五)
1 private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { 2 final NioUnsafe unsafe = ch.unsafe(); 3 if (!k.isValid()) { 4 unsafe.close(unsafe.voidPromise()); 5 return; 6 } 7 8 try { 9 int readyOps = k.readyOps();10 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {11 unsafe.read();12 if (!ch.isOpen()) {13 return;14 }15 }16 if ((readyOps & SelectionKey.OP_WRITE) != 0) {17 ch.unsafe().forceFlush();18 }19 if ((readyOps & SelectionKey.OP_CONNECT) != 0) {20 int ops = k.interestOps();21 ops &= ~SelectionKey.OP_CONNECT;22 k.interestOps(ops);23 24 unsafe.finishConnect();25 }26 } catch (CancelledKeyException ignored) {27 unsafe.close(unsafe.voidPromise());28 }29 }
在這里根據(jù)傳入的SelectionKey的已就緒操作類型來決定下一步的操作,如果是一個讀操作,那么進入AbstractNioMessageChannel$NioMessageUnsafe的read實現(xiàn),這里代碼很多,我們只貼一下核心的代碼:
(代碼六)
1 @Override 2 public void read() { 3 ... 4 final ChannelPipeline pipeline = pipeline(); 5 ... 6 try { 7 int size = readBuf.size(); 8 for (int i = 0; i < size; i ++) { 9 pipeline.fireChannelRead(readBuf.get(i));10 }11 ...12 readBuf.clear();13 pipeline.fireChannelReadComplete();14 } finally {15 }16 }
核心就是這個pipeline.fireChannelRead(readBuf.get(i));,這已經(jīng)到了pipeline階段,可能有些人會誤以為這是不是已經(jīng)到了worker線程中,但是不可能啊,我們的代碼其實在處于processSelectedKeys的邏輯里面。實際上,不論是boss還是worker,他們都是NioEventLoopGroup,玩法都是一樣的,只不過職責不一樣而已。boss也有自己的handler,上篇文章中我們提到了netty中的reactor模式的玩法,從Doug Lea的圖中可以看出,boss(實際上就是mainReactor)的handler其實就是這個acceptor。
在此我們順便學習一下netty中的handler:
從用途上來說,handler分為ChannelInboundHandler(讀)和ChannelOutboundHandler(寫),增加一層適配器產(chǎn)生了兩handler的Adapter,我們使用到的類都是繼承自這兩個Adapter。我們經(jīng)常用到的SimpleChannelInboundHandler就繼承ChannelInboundHandlerAdapter,用于初始化用戶handler鏈的ChannelInitializer和boss線程綁定的ServerBootstrapAcceptor也都繼承于此。
回到【代碼六主線】我們從pipeline.fireChannelRead繼續(xù)追蹤下去會追到ChannelInboundHandler的channelRead的實現(xiàn),而這里的Hander就是ServerBootstrapAcceptor。
(代碼七)
1 @Override 2 @SuppressWarnings("unchecked") 3 public void channelRead(ChannelHandlerContext ctx, Object msg) { 4 final Channel child = (Channel) msg; 5 6 child.pipeline().addLast(childHandler); 7 8 for (Entry<ChannelOption<?>, Object> e: childOptions) { 9 try {10 if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {11 }12 } catch (Throwable t) {13 }14 }15 16 for (Entry<AttributeKey<?>, Object> e: childAttrs) {17 child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());18 }19 20 try {21 childGroup.register(child).addListener(new ChannelFutureListener() {22 @Override23 public void operationComplete(ChannelFuture future) throws Exception {24 if (!future.isSuccess()) {25 forceClose(child, future.cause());26 }27 }28 });29 } catch (Throwable t) {30 forceClose(child, t);31 }32 }
由于ServerBootstrapAcceptor 很重要,我們先看一下都有什么內(nèi)容:
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter { private final EventLoopGroup childGroup; private final ChannelHandler childHandler; private final Entry<ChannelOption<?>, Object>[] childOptions; private final Entry<AttributeKey<?>, Object>[] childAttrs; }
我自己的理解:
childGroup就是subReactor(也就是worker線程);childHandler就是xxx;childOptions和childAttrs是為channel準備的一些參數(shù)。
回到【代碼七主線】在這里做了3件事:
1.為客戶端channel的pipeline中添加childHandler,那么這個childHandler是什么鬼呢?回憶一下上文中的服務端啟動代碼,有bootStrap.childHandler(xxx)這樣的代碼,所以此處就是把在服務端啟動時我們定義好的Handler鏈綁定給每個channel。
2.把我們服務端初始化時的參數(shù)綁定到每個channel中。
3.childGroup.register(child).addListener(new ChannelFutureListener()),后面這個異步listener作用很明確,問題是這個childGroup是什么鬼?我理解應該就是worker線程了。詳細說一下childGroup.register(child),繼續(xù)跟下去,跟到AbstractChannel$AbstractUnsafe中
(代碼八)
1 @Override 2 public final void register(EventLoop eventLoop, final ChannelPromise promise) { 3 ... 4 AbstractChannel.this.eventLoop = eventLoop; 5 6 if (eventLoop.inEventLoop()) { 7 register0(promise); 8 } else { 9 ...10 } catch (Throwable t) {11 }12 }13 }
繼續(xù)register0:
(代碼九)
1 private void register0(ChannelPromise promise) { 2 try { 3 if (!promise.setUncancellable() || !ensureOpen(promise)) { 4 return; 5 } 6 boolean firstRegistration = neverRegistered; 7 doRegister(); 8 neverRegistered = false; 9 registered = true;10 safeSetSuccess(promise);11 pipeline.fireChannelRegistered();12 if (firstRegistration && isActive()) {13 pipeline.fireChannelActive();14 }15 } catch (Throwable t) {16 }17 }