NioEventLoop 是jdk nio多路處理實現(xiàn)同修復(fù)jdk nio的bug
1.NioEventLoop繼承SingleThreadEventLoop 重用單線程處理
2.NioEventLoop是組成 pool EventLoopGroup 基本單元
總之好多邊界判斷跟業(yè)務(wù)經(jīng)驗之類的代碼,非常煩碎
重要屬性
public final class NioEventLoop extends SingleThreadEventLoop { //綁定 selector Selector selector; //優(yōu)化過的Set集合 private SelectedSelectionKeySet selectedKeys; //引用全局 SelectorProvider private final SelectorProvider provider; /////////////////////////////////////////// //為true時執(zhí)行selector.wakeup() private final AtomicBoolean wakenUp = new AtomicBoolean(); //io任務(wù)占時比率 private volatile int ioRatio = 50; //記錄selectionKey撤銷次數(shù) private int cancelledKeys; //處理selector.selectNow() 標(biāo)志 private boolean needsToSelectAgain; }
替換Selector selectedKeySet字段與重構(gòu)Selector
優(yōu)化selectedKeySet集合用的是double cache技術(shù),這種技術(shù)在圖形渲染處理比較多
//netty 用到反射加 AccessController技術(shù)替換掉 Selector selectedKeySet 字段 private Selector openSelector() { final Selector selector = provider.openSelector(); final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet(); Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override public Object run() { try { return Class.forName( "sun.nio.ch.SelectorImpl", false, PlatformDependent.getSystemClassLoader()); } catch (Throwable cause) { return cause; } } }); final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass; Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override public Object run() { //用到反射技術(shù)更改 SelectorImpl 字段 Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys"); Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys"); selectedKeysField.setAccessible(true); publicSelectedKeysField.setAccessible(true); selectedKeysField.set(selector, selectedKeySet); publicSelectedKeysField.set(selector, selectedKeySet); return null; } }); return selector; }//重新構(gòu)建Selector private void rebuildSelector0() { final Selector oldSelector = selector; final Selector newSelector; if (oldSelector == null) { return; } newSelector = openSelector(); //遷移處理 int nChannels = 0; for (SelectionKey key: oldSelector.keys()) { Object a = key.attachment(); try { //過濾key是否合法 已處理 if (!key.isValid() || key.channel().keyFor(newSelector) != null) { continue; } int interestOps = key.interestOps(); key.cancel(); SelectionKey newKey = key.channel().register(newSelector, interestOps, a); if (a instanceof AbstractNioChannel) { // channel重新綁定SelectionKey ((AbstractNioChannel) a).selectionKey = newKey; } nChannels ++; } catch (Exception e) { //出錯處理 netty認(rèn)為 socket已關(guān)閉 if (a instanceof AbstractNioChannel) { AbstractNioChannel ch = (AbstractNioChannel) a; ch.unsafe().close(ch.unsafe().voidPromise()); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; invokeChannelUnregistered(task, key, e); } } } selector = newSelector; oldSelector.close(); }
double cache 實現(xiàn)
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> { private SelectionKey[] keysA; private int keysASize; private SelectionKey[] keysB; private int keysBSize; private boolean isA = true; SelectedSelectionKeySet() { keysA = new SelectionKey[1024]; keysB = keysA.clone(); } @Override public boolean add(SelectionKey o) { if (o == null) { return false; } //是A開關(guān)即處理A if (isA) { int size = keysASize; keysA[size ++] = o; keysASize = size; //雙倍擴展容量 if (size == keysA.length) { doubleCapacityA(); } } else { int size = keysBSize; keysB[size ++] = o; keysBSize = size; if (size == keysB.length) { doubleCapacityB(); } } return true; } private void doubleCapacityA() { SelectionKey[] newKeysA = new SelectionKey[keysA.length << 1]; System.arraycopy(keysA, 0, newKeysA, 0, keysASize); keysA = newKeysA; } private void doubleCapacityB() { SelectionKey[] newKeysB = new SelectionKey[keysB.length << 1]; System.arraycopy(keysB, 0, newKeysB, 0, keysBSize); keysB = newKeysB; } //獲取keys并切換 SelectionKey[] flip() { if (isA) { isA = false; keysA[keysASize] = null; keysBSize = 0; return keysA; } else { isA = true; keysB[keysBSize] = null; keysASize = 0; return keysB; } } @Override public int size() { return isA?keysASize : keysBSize; } }
重載Selector select 邏輯,修復(fù)jdk 會產(chǎn)生的 bug
private void select(boolean oldWakenUp) throws IOException { Selector selector = this.selector; int selectCnt = 0; long currentTimeNanos = System.nanoTime(); //通過delayNanos計算出 select結(jié)束時間 long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); for (;;) { //計算出超時并轉(zhuǎn)換成毫秒,再加上延時固定0.5毫秒 long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; if (timeoutMillis <= 0) { if (selectCnt == 0) { selector.selectNow(); selectCnt = 1; } break; } //如果有非IO任務(wù),優(yōu)先等侍selector操作 if (hasTasks() && wakenUp.compareAndSet(false, true)) { selector.selectNow(); selectCnt = 1; break; } //阻塞當(dāng)前線程 int selectedKeys = selector.select(timeoutMillis); selectCnt ++; //有IO,非IO,計劃任務(wù),wakenUp狀態(tài)認(rèn)為已完成 select 處理 if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { break; } //如果當(dāng)前線程中斷,netty認(rèn)為關(guān)閉了服務(wù),退出處理 if (Thread.interrupted()) { selectCnt = 1; break; } //相當(dāng)于下面等價,意思是當(dāng)前時間大于或等于 (selectDeadLineNanos + 0.5毫秒) selectCnt 重置 //currentTimeNanos + (System.nanoTime() - selectDeadLineNanos - 500000L ) >= currentTimeNanos //System.nanoTime() - selectDeadLineNanos - 500000L >= 0 //System.nanoTime() >= selectDeadLineNanos + 500000L long time = System.nanoTime(); if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) { selectCnt = 1; } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { // selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD 默認(rèn)值512,重構(gòu)selector rebuildSelector(); selector = this.selector; selector.selectNow(); selectCnt = 1; break; } //刷新當(dāng)前時間 currentTimeNanos = time; } }
分發(fā)io與非io任務(wù)邏輯實現(xiàn)
//這部分做了代碼整理 @Override protected void run() { for (;;) { try { //檢查是否有非IO任務(wù)同WAKEUP_TASK任務(wù) if(!hasTasks()){ continue; } //有任務(wù)就觸發(fā)重寫的 select select(wakenUp.getAndSet(false)); if (wakenUp.get()) { selector.wakeup(); } cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio;//默認(rèn)值50 try { final long ioStartTime = System.nanoTime(); //processSelectedKeys(); //一般會selectedKeys不會為null做了優(yōu)化處理
作者: | solq |
博客地址: | http://www.cnblogs.com/solq111 |
博客版權(quán): | 本文以學(xué)習(xí)、研究和分享為主,歡迎轉(zhuǎn)載,但必須在文章頁面明顯位置給出原文連接。 如果文中有不妥或者錯誤的地方還望高手的你指出,以免誤人子弟。如果覺得本文對你有所幫助不如【推薦】一下!如果你有更好的建議,不如留言一起討論,共同進步! |
http://www.cnblogs.com/solq111/p/6925413.html