Unsafe是托委訪問socket,那么Channel是直接提供給開發(fā)者使用的
Channel 主要有兩個實現(xiàn) NioServerSocketChannel同NioSocketChannel 致于其它不常用不在研究范圍內(nèi)
NioServerSocketChannel 是給server用的,程序由始至終只有一個NioServerSocketChannel
NioSocketChannel 是給客戶端用的,每個連接生成一個NioSocketChannel 對象
NioSocketChannel同NioSocketChannel的繼承關(guān)系
NioSocketChannel -> AbstractNioByteChannel -> AbstractNioChannel -> AbstractChannel
NioServerSocketChannel -> AbstractNioMessageChannel-> AbstractNioChannel -> AbstractChannel
小提示:如果看文字不夠直觀可以在eclipse里按快捷鍵 選擇類 ctrl+t
channel有unsafe相應(yīng)的實現(xiàn)類,反之亦是。其實功能是很簡單的,劃分太多對象目的是對某部分功能重用,有時也可能因過渡設(shè)計造成
對于channel我們主要分析 I/O read/write操作
NioServerSocketChannel AbstractNioMessageChannel SelectorProvider DEFAULT_SELECTOR_PROVIDER = (= NioServerSocketChannelConfig( doReadMessages(List<Object> buf) = (ch != buf.add( NioSocketChannel( 1 0 doWrite(ChannelOutboundBuffer in)
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel { public NioSocketChannel(Channel parent, SocketChannel socket) { super(parent, socket); config = new NioSocketChannelConfig(this, socket.socket()); } //////////////////////////////這部分是unsafe底層調(diào)用上層的實現(xiàn)////////////////////////////////////////////// @Override protected int doReadBytes(ByteBuf byteBuf) throws Exception { final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); //這里設(shè)置byteBuf寫入數(shù)據(jù)坐標 allocHandle.attemptedBytesRead(byteBuf.writableBytes()); return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead()); } @Override protected int doWriteBytes(ByteBuf buf) throws Exception { final int expectedWrittenBytes = buf.readableBytes(); return buf.readBytes(javaChannel(), expectedWrittenBytes); } @Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { for (;;) { int size = in.size(); //沒有數(shù)據(jù)退出 if (size == 0) { clearOpWrite(); break; } long writtenBytes = 0; //記錄寫數(shù)據(jù)size boolean done = false; //是否完成 boolean setOpWrite = false; ByteBuffer[] nioBuffers = in.nioBuffers(); int nioBufferCnt = in.nioBufferCount(); long expectedWrittenBytes = in.nioBufferSize(); SocketChannel ch = javaChannel(); //這里有三種分支處理 //如果沒有ByteBuffer 有可能只發(fā)送幾個byte //1跟default邏輯其實是一樣的 switch (nioBufferCnt) { case 0: //調(diào)用父類 AbstractNioByteChannel doWrite,邏輯基本相同,不同的是AbstractNioByteChannel處理的是byte 實現(xiàn)調(diào)用的是 doWriteBytes(ByteBuf buf)方法。。。 super.doWrite(in); return; case 1: //這里只循環(huán)16次,可以看出是復制下面代碼的哈。。。 ByteBuffer nioBuffer = nioBuffers[0]; for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { final int localWrittenBytes = ch.write(nioBuffer); if (localWrittenBytes == 0) { setOpWrite = true; break; } expectedWrittenBytes -= localWrittenBytes; writtenBytes += localWrittenBytes; if (expectedWrittenBytes == 0) { done = true; break; } } break; default: //多個ByteBuffer時跟上面邏輯一樣 for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt); if (localWrittenBytes == 0) { setOpWrite = true; break; } expectedWrittenBytes -= localWrittenBytes; writtenBytes += localWrittenBytes; if (expectedWrittenBytes == 0) { done = true; break; } } break; } // Release the fully written buffers, and update the indexes of the partially written buffer. in.removeBytes(writtenBytes); if (!done) { // Did not write all buffers completely. incompleteWrite(setOpWrite); break; } } } }
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { //生成NioSocketChannel時就綁定 unsafe pipeline protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); } }protected abstract class AbstractUnsafe implements Unsafe { private void register0(ChannelPromise promise) { try { if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; doRegister(); // doRegister 是調(diào)用 AbstractNioChannel selectionKey = javaChannel().register(eventLoop().selector, 0, this); neverRegistered = false; registered = true; //這里是添加 Handler 每個Handler會生成一個Context pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); //通知Handler Registered pipeline.fireChannelRegistered(); if (isActive()) { if (firstRegistration) { //通知Handler Active pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } } catch (Throwable t) { //....... } } }
小結(jié):看似很復雜的Channel實現(xiàn)其實沒想象難,大多數(shù)讀寫坐標記錄交給ByteBuf處理掉了
1.server每個client連接轉(zhuǎn)換成NioSocketChannel對象
2.構(gòu)建NioSocketChannel時就已經(jīng)生成 unsafe、pipeline
作者: | solq |
博客地址: | http://www.cnblogs.com/solq111 |
博客版權(quán): | 本文以學習、研究和分享為主,歡迎轉(zhuǎn)載,但必須在文章頁面明顯位置給出原文連接。 如果文中有不妥或者錯誤的地方還望高手的你指出,以免誤人子弟。如果覺得本文對你有所幫助不如【推薦】一下!如果你有更好的建議,不如留言一起討論,共同進步! 再次感謝您耐心的讀完本篇文章。 QQ群:9547527 |
http://www.cnblogs.com/solq111/p/7066208.html