本文我們將先從NioEventLoop開始來學習服務端的處理流程。話不多說,開始學習~~~~

  我們從上文中已經(jīng)知道server在啟動的時候會開啟兩個線程:bossGroup和workerGroup,這兩個線程分別是boss線程池(用于接收client請求)和worker線程池(用于處理具體的讀寫操作),這兩個線程調(diào)度器都是NioEventLoopGroup,bossGroup有一個NioEventLoop,而worker線程池有n*cup數(shù)量個NioEventLoop。那么我們看看在NioEventLoop中的是如何開始的:

  NioEventLoop本質(zhì)上是一個線程調(diào)度器(繼承自ScheduledExecutorService),當bind之后就開始run起一個線程:  

平面設計培訓,網(wǎng)頁設計培訓,美工培訓,游戲開發(fā),動畫培訓

 (代碼一)
1
   @Override 2     protected void run() { 3         for (;;) { 4             boolean oldWakenUp = wakenUp.getAndSet(false); 5             try { 6                 if (hasTasks()) { 7                    selectNow(); 8                 } else { 9                    select(oldWakenUp);10 11                     if (wakenUp.get()) {12                        selector.wakeup();13                    }14                }15 16                 cancelledKeys = 0;17                 needsToSelectAgain = false;18                 final int ioRatio = this.ioRatio;19                 if (ioRatio == 100) {20                    processSelectedKeys();21                    runAllTasks();22                 } else {23                     final long ioStartTime = System.nanoTime();24 25                    processSelectedKeys();26 27                     final long ioTime = System.nanoTime() - ioStartTime;28                     runAllTasks(ioTime * (100 - ioRatio) / ioRatio);29                }30 31                 if (isShuttingDown()) {32                    closeAll();33                     if (confirmShutdown()) {34                         break;35                    }36                }37             } catch (Throwable t) {38                ...39            }40        }41     }

平面設計培訓,網(wǎng)頁設計培訓,美工培訓,游戲開發(fā),動畫培訓

 

  這個for(;;)里面就是boss線程的核心處理流程:

  【代碼一主線】1,不斷地監(jiān)聽selector拿到socket句柄然后創(chuàng)建channel。每次run的時候先拿到wakeup的值,并且set進去false(PS:wakeup是什么鬼?一個AtomicBoolean,代表是否用戶喚醒,如果不人為將其set成true,永遠是false)。

    【代碼一主線】2,如果任務隊列中已有任務,那么selectNow(),(PS:selectNow是什么鬼?我們知道selector.select()是一個阻塞調(diào)用,而selectNow方法是個非阻塞方法,如果沒有到達的socket句柄則返回0),因此若隊列中已有任務的話應該立即開始執(zhí)行,而不能阻塞到selector.select()上,否則則調(diào)用select()方法,繼續(xù)看select()里面:

平面設計培訓,網(wǎng)頁設計培訓,美工培訓,游戲開發(fā),動畫培訓

 (代碼二)
1
    private void select(boolean oldWakenUp) throws IOException { 2         Selector selector = this.selector; 3         try { 4             int selectCnt = 0; 5             long currentTimeNanos = System.nanoTime(); 6             long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); 7             for (;;) { 8                 long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; 9                 if (timeoutMillis <= 0) {10                     if (selectCnt == 0) {11                        selector.selectNow();12                         selectCnt = 1;13                    }14                     break;15                }16 17                 int selectedKeys = selector.select(timeoutMillis);18                 selectCnt ++;19 20                 if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {21                     // 如果selectedKeys不為空、或者被用戶喚醒、或者隊列中有待處理任務、或者調(diào)度器中有任務,則break22                     break;23                }24                 if (Thread.interrupted()) {25                     //如果線程被中斷則重置selectedKeys,同時break出本次循環(huán),所以不會陷入一個繁忙的循環(huán)。26                     selectCnt = 1;27                     break;28                }29 30                 long time = System.nanoTime();31                 if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {32                     // selector超時33                    selectCnt = 1;34                 } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {35                     // selector多次過早返回,重新建立并打開Selector36                    ...37                }38 39                 currentTimeNanos = time;40            }41            ...42         } catch (CancelledKeyException e) {43            ...44        }45     }

平面設計培訓,網(wǎng)頁設計培訓,美工培訓,游戲開發(fā),動畫培訓

   我們看到,select()方法進入一個for循環(huán)去select阻塞等待socket(這里的selector的實現(xiàn)在是根據(jù)操作系統(tǒng)和netty的版本來定的,在最新的netty中是使用的linux的epoll模型),同時入?yún)⒗镉小俺瑫r時間”,如果超過了這個時間仍然沒有socket到來則重新將selectCnt置為1重新循環(huán)等待,直到有socket到來。如果selectedKeys不為空、或者被用戶喚醒、或者隊列中有待處理任務、或者調(diào)度器中有任務,那么就是說該eventLoop有活干了,先break出去去干活,完了再打開selector重新阻塞等待。正常情況下會等待到一個socket,break出去之后回到代碼一

  【代碼一主線】3,根據(jù)ioRatio來選擇任務執(zhí)行策略(PS:ioRatio是什么鬼?看了下用途應該是這樣的,這個ioRatio代表該eventLoop期望在I/O操作上花費時間的比例)。而NioEventLoop中有兩類操作,一類是I/O操作(讀寫之類),調(diào)用processSelectedKeys;一類是非I/O操作(例如register等),調(diào)用runAllTasks。如果ioRatio是100的話那么會按照順序執(zhí)行I/O操作->非I/O操作;如果不是會按照這個比例算出一個超時時間,在run任務隊列的時候如果超過了這個時間會立即返回,確保I/O操作可以得到及時的調(diào)用。

  我們關心的是I/O操作,那么進入processSelectedKeys()看下發(fā)生了什么吧。

  

平面設計培訓,網(wǎng)頁設計培訓,美工培訓,游戲開發(fā),動畫培訓

(代碼三)
1
    private void processSelectedKeys() {2         if (selectedKeys != null) {3            processSelectedKeysOptimized(selectedKeys.flip());4         } else {5            processSelectedKeysPlain(selector.selectedKeys());6        }7     }

平面設計培訓,網(wǎng)頁設計培訓,美工培訓,游戲開發(fā),動畫培訓

 

  正常情況下會走到processSelectedKeysOptimized中:

  

平面設計培訓,網(wǎng)頁設計培訓,美工培訓,游戲開發(fā),動畫培訓

 (代碼四)
1
  private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) { 2         for (int i = 0;; i ++) { 3             final SelectionKey k = selectedKeys[i]; 4             if (k == null) { 5                 break; 6            } 7             selectedKeys[i] = null; 8 9             final Object a = k.attachment();10 11             if (a instanceof AbstractNioChannel) {12                processSelectedKey(k, (AbstractNioChannel) a);13             } else {14                 @SuppressWarnings("unchecked")15                 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;16                processSelectedKey(k, task);17            }18 19             if (needsToSelectAgain) {20                 for (;;) {21                     if (selectedKeys[i] == null) {22                         break;23                    }24                     selectedKeys[i] = null;25                     i++;26                }27 28                selectAgain();29                 selectedKeys = this.selectedKeys.flip();30                 i = -1;31            }32        }33     }

平面設計培訓,網(wǎng)頁設計培訓,美工培訓,游戲開發(fā),動畫培訓

  

  遍歷拿到所有的SelectionKey,然后判斷每個SelectionKey的attachment,上篇文章中已經(jīng)分析過給ServerBootstrap注冊的Channel是NioServerSocketChannel(繼承自AbstractNioChannel),因此進入processSelectedKey中:

 

平面設計培訓,網(wǎng)頁設計培訓,美工培訓,游戲開發(fā),動畫培訓

 (代碼五)
1
  private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { 2         final NioUnsafe unsafe = ch.unsafe(); 3         if (!k.isValid()) { 4            unsafe.close(unsafe.voidPromise()); 5             return; 6        } 7 8         try { 9             int readyOps = k.readyOps();10             if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {11                unsafe.read();12                 if (!ch.isOpen()) {13                     return;14                }15            }16             if ((readyOps & SelectionKey.OP_WRITE) != 0) {17                ch.unsafe().forceFlush();18            }19             if ((readyOps & SelectionKey.OP_CONNECT) != 0) {20                 int ops = k.interestOps();21                 ops &= ~SelectionKey.OP_CONNECT;22                k.interestOps(ops);23 24                unsafe.finishConnect();25            }26         } catch (CancelledKeyException ignored) {27            unsafe.close(unsafe.voidPromise());28        }29     }

平面設計培訓,網(wǎng)頁設計培訓,美工培訓,游戲開發(fā),動畫培訓

  

  在這里根據(jù)傳入的SelectionKey的已就緒操作類型來決定下一步的操作,如果是一個讀操作,那么進入AbstractNioMessageChannel$NioMessageUnsafe的read實現(xiàn),這里代碼很多,我們只貼一下核心的代碼:

 

平面設計培訓,網(wǎng)頁設計培訓,美工培訓,游戲開發(fā),動畫培訓

(代碼六)
1
       @Override 2         public void read() { 3            ... 4             final ChannelPipeline pipeline = pipeline(); 5            ... 6             try { 7                 int size = readBuf.size(); 8                 for (int i = 0; i < size; i ++) { 9                    pipeline.fireChannelRead(readBuf.get(i));10                }11                ...12                readBuf.clear();13                pipeline.fireChannelReadComplete();14             } finally {15            }16         }

平面設計培訓,網(wǎng)頁設計培訓,美工培訓,游戲開發(fā),動畫培訓

  核心就是這個pipeline.fireChannelRead(readBuf.get(i));,這已經(jīng)到了pipeline階段,可能有些人會誤以為這是不是已經(jīng)到了worker線程中,但是不可能啊,我們的代碼其實在處于processSelectedKeys的邏輯里面。實際上,不論是boss還是worker,他們都是NioEventLoopGroup,玩法都是一樣的,只不過職責不一樣而已。boss也有自己的handler,上篇文章中我們提到了netty中的reactor模式的玩法,從Doug Lea的圖中可以看出,boss(實際上就是mainReactor)的handler其實就是這個acceptor。

  在此我們順便學習一下netty中的handler:

  平面設計培訓,網(wǎng)頁設計培訓,美工培訓,游戲開發(fā),動畫培訓

  從用途上來說,handler分為ChannelInboundHandler(讀)和ChannelOutboundHandler(寫),增加一層適配器產(chǎn)生了兩handler的Adapter,我們使用到的類都是繼承自這兩個Adapter。我們經(jīng)常用到的SimpleChannelInboundHandler就繼承ChannelInboundHandlerAdapter,用于初始化用戶handler鏈的ChannelInitializer和boss線程綁定的ServerBootstrapAcceptor也都繼承于此。

  回到【代碼六主線】我們從pipeline.fireChannelRead繼續(xù)追蹤下去會追到ChannelInboundHandler的channelRead的實現(xiàn),而這里的Hander就是ServerBootstrapAcceptor。

平面設計培訓,網(wǎng)頁設計培訓,美工培訓,游戲開發(fā),動畫培訓

 (代碼七)
1
       @Override 2         @SuppressWarnings("unchecked") 3         public void channelRead(ChannelHandlerContext ctx, Object msg) { 4             final Channel child = (Channel) msg; 5 6            child.pipeline().addLast(childHandler); 7 8             for (Entry<ChannelOption<?>, Object> e: childOptions) { 9                 try {10                     if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {11                    }12                 } catch (Throwable t) {13                }14            }15 16             for (Entry<AttributeKey<?>, Object> e: childAttrs) {17                 child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());18            }19 20             try {21                 childGroup.register(child).addListener(new ChannelFutureListener() {22                    @Override23                     public void operationComplete(ChannelFuture future) throws Exception {24                         if (!future.isSuccess()) {25                            forceClose(child, future.cause());26                        }27                    }28                });29             } catch (Throwable t) {30                forceClose(child, t);31            }32         }

平面設計培訓,網(wǎng)頁設計培訓,美工培訓,游戲開發(fā),動畫培訓

  由于ServerBootstrapAcceptor 很重要,我們先看一下都有什么內(nèi)容:

平面設計培訓,網(wǎng)頁設計培訓,美工培訓,游戲開發(fā),動畫培訓

private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {        private final EventLoopGroup childGroup;        private final ChannelHandler childHandler;        private final Entry<ChannelOption<?>, Object>[] childOptions;        private final Entry<AttributeKey<?>, Object>[] childAttrs;
}

平面設計培訓,網(wǎng)頁設計培訓,美工培訓,游戲開發(fā),動畫培訓

  我自己的理解:

  childGroup就是subReactor(也就是worker線程);childHandler就是xxx;childOptions和childAttrs是為channel準備的一些參數(shù)。

  回到【代碼七主線】在這里做了3件事:

  1.為客戶端channel的pipeline中添加childHandler,那么這個childHandler是什么鬼呢?回憶一下上文中的服務端啟動代碼,有bootStrap.childHandler(xxx)這樣的代碼,所以此處就是把在服務端啟動時我們定義好的Handler鏈綁定給每個channel。

  2.把我們服務端初始化時的參數(shù)綁定到每個channel中。

  3.childGroup.register(child).addListener(new ChannelFutureListener()),后面這個異步listener作用很明確,問題是這個childGroup是什么鬼?我理解應該就是worker線程了。詳細說一下childGroup.register(child),繼續(xù)跟下去,跟到AbstractChannel$AbstractUnsafe中

平面設計培訓,網(wǎng)頁設計培訓,美工培訓,游戲開發(fā),動畫培訓

(代碼八)
1
       @Override 2         public final void register(EventLoop eventLoop, final ChannelPromise promise) { 3            ... 4             AbstractChannel.this.eventLoop = eventLoop; 5 6             if (eventLoop.inEventLoop()) { 7                register0(promise); 8             } else { 9                ...10                 } catch (Throwable t) {11                }12            }13         }

平面設計培訓,網(wǎng)頁設計培訓,美工培訓,游戲開發(fā),動畫培訓

  繼續(xù)register0:

平面設計培訓,網(wǎng)頁設計培訓,美工培訓,游戲開發(fā),動畫培訓

(代碼九)
1
    private void register0(ChannelPromise promise) { 2             try { 3                 if (!promise.setUncancellable() || !ensureOpen(promise)) { 4                     return; 5                } 6                 boolean firstRegistration = neverRegistered; 7                doRegister(); 8                 neverRegistered = false; 9                 registered = true;10                safeSetSuccess(promise);11                pipeline.fireChannelRegistered();12                 if (firstRegistration && isActive()) {13                    pipeline.fireChannelActive();14                }15             } catch (Throwable t) {16            }17         }

平面設計培訓,網(wǎng)頁設計培訓,美工培訓,游戲開發(fā),動畫培訓