這里主要圍繞著Java NIO展開,從Java NIO的基本使用,到介紹Linux下NIO API,再到Java Selector其底層的實現(xiàn)原理。

  • Java NIO基本使用

  • Linux下的NIO系統(tǒng)調(diào)用介紹

  • Selector原理

  • Channel和Buffer之間的堆外內(nèi)存

Java NIO基本使用

從JDK NIO文檔里面可以發(fā)現(xiàn),Java將其劃分成了三大塊:Channel,Buffer以及多路復(fù)用Selector。Channel的存在,封裝了對什么實體的連接通道(如網(wǎng)絡(luò)/文件);Buffer封裝了對數(shù)據(jù)的緩沖存儲,最后對于Selector則是提供了一種可以以單線程非阻塞的方式,來處理多個連接。

基本應(yīng)用示例

NIO的基本步驟是,創(chuàng)建Selector和ServerSocketChannel,然后注冊channel的ACCEPT事件,調(diào)用select方法,等待連接的到來,以及接收連接后將其注冊到Selector中。下面的為Echo Server的示例:

public class SelectorDemo {    public static void main(String[] args) throws IOException {


        Selector selector = Selector.open();
        ServerSocketChannel socketChannel = ServerSocketChannel.open();
        socketChannel.bind(new InetSocketAddress(8080));
        socketChannel.configureBlocking(false);
        socketChannel.register(selector, SelectionKey.OP_ACCEPT);        while (true) {            int ready = selector.select();            if (ready == 0) {                continue;
            } else if (ready < 0) {                break;
            }

            Set<SelectionKey> keys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = keys.iterator();            while (iterator.hasNext()) {

                SelectionKey key = iterator.next();                if (key.isAcceptable()) {

                    ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                    SocketChannel accept = channel.accept();                    if (accept == null) {                        continue;
                    }
                    accept.configureBlocking(false);
                    accept.register(selector, SelectionKey.OP_READ);
                } else if (key.isReadable()) {                    // 讀事件
                    deal((SocketChannel) key.channel(), key);
                } else if (key.isWritable()) {                    // 寫事件
                    resp((SocketChannel) key.channel(), key);
                }                // 注:處理完成后要從中移除掉
                iterator.remove();
            }
        }
        selector.close();
        socketChannel.close();
    }    private static void deal(SocketChannel channel, SelectionKey key) throws IOException {

        ByteBuffer buffer = ByteBuffer.allocate(1024);
        ByteBuffer responseBuffer = ByteBuffer.allocate(1024);        int read = channel.read(buffer);        if (read > 0) {
            buffer.flip();
            responseBuffer.put(buffer);
        } else if (read == -1) {
            System.out.println("socket close");
            channel.close();            return;
        }

        key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
        key.attach(responseBuffer);
    }    private static void resp(SocketChannel channel, SelectionKey key) throws IOException {

        ByteBuffer buffer = (ByteBuffer) key.attachment();
        buffer.flip();

        channel.write(buffer);        if (!buffer.hasRemaining()) {
            key.attach(null);
            key.interestOps(SelectionKey.OP_READ);
        }
    }
}

Linux下的NIO系統(tǒng)調(diào)用介紹

在Linux環(huán)境下,提供了幾種方式可以實現(xiàn)NIO,如epoll,poll,select等。對于select/poll,每次調(diào)用,都是從外部傳入FD和監(jiān)聽事件,這就導(dǎo)致每次調(diào)用的時候,都需要將這些數(shù)據(jù)從用戶態(tài)復(fù)制到內(nèi)核態(tài),就導(dǎo)致了每次調(diào)用代價比較大,而且每次從select/poll返回回來,都是全量的數(shù)據(jù),需要自行去遍歷檢查哪些是READY的。對于epoll,則為增量式的,系統(tǒng)內(nèi)部維護了所需要的FD和監(jiān)聽事件,要注冊的時候,調(diào)用epoll_ctl即可,而每次調(diào)用,不再需要傳入了,返回的時候,只返回READY的監(jiān)聽事件和FD。下面作個簡單的偽代碼:
具體的可以看以前的文章:http://www.cnblogs.com/jabnih/category/724636.html

// 1. 創(chuàng)建server socket// 2. 綁定地址// 3. 監(jiān)聽端口// 4. 創(chuàng)建epollint epollFd = epoll_create(1024);// 5. 注冊監(jiān)聽事件struct epoll_event event;
event.events = EPOLLIN | EPOLLRDHUP | EPOLLET;
event.data.fd = serverFd;
epoll_ctl(epollFd, EPOLL_CTL_ADD, serverFd, &event);while(true) {
    readyNums = epoll_wait( epollFd, events, 1024, -1 );    
    if ( readyNums < 0 )
     {         printf("epoll_wait error\n");         exit(-1);
     }     for ( i = 0; i <  readyNums; ++i)
     {         if ( events[i].data.fd == serverFd )
         {
             clientFd = accept( serverFd, NULL, NULL );             // 注冊監(jiān)聽事件
             ...
         }else if ( events[i].events & EPOLLIN )
         {            // 處理讀事件
         }else if ( events[i].events & EPOLLRDHUP )
         {            // 關(guān)閉連接事件
            close( events[i].data.fd );
         }
}

Selector原理

SelectionKey

從Java頂層使用者角度來看,channel通過注冊,返回SelectionKey,而Selector.select方法,也是通過返回SelectionKey來使用。那么這里為什么會需要這個類呢?這個類有什么作用?無論是任何語言,其實都脫離不了系統(tǒng)底層的支持,通過上述Linux下的基本應(yīng)用,可以知道,通過系統(tǒng)調(diào)用,向其傳遞和返回的都是FD以及事件這些參數(shù),那么站在設(shè)計角度來看,就需要有一個映射關(guān)系,使得可以關(guān)聯(lián)起來,這里有Channel封裝的是通過,如果將READY事件這些參數(shù)放在里面,不太合適,這個時候,SelectionKey出現(xiàn)了,在SelectionKey內(nèi)部,保存Channel的引用以及一些事件信息,然后Selector通過FD找到SelectionKey來進行關(guān)聯(lián)。在底層EP里面,就有一個屬性:Map<Integer,SelectionKeyImpl> fdToKey。

EPollSelectorImpl

在Linux 2.6+版本,Java NIO采用的epoll(即EPollSelectorImpl類),對于2.4.x的,則使用poll(即PollSelectorImpl類),這里以epoll為例。

select方法

頂層Selector,通過調(diào)用select方法,最終會調(diào)用到EPollSelectorImpl.doSelect方法,通過該方法,可以看到,其首先會處理一些不再注冊的事件,調(diào)用pollWrapper.poll(timeout);,然后再進行一次清理,最后,可以看到需要處理映射關(guān)系

protected int doSelect(long timeout)
    throws IOException{    if (closed)        throw new ClosedSelectorException();    // 處理一些不再注冊的事件
    processDeregisterQueue();    try {
        begin();
        pollWrapper.poll(timeout);
    } finally {
        end();
    }    // 再進行一次清理
    processDeregisterQueue();    int numKeysUpdated = updateSelectedKeys();    if (pollWrapper.interrupted()) {        // Clear the wakeup pipe
        pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);        synchronized (interruptLock) {
            pollWrapper.clearInterrupted();
            IOUtil.drain(fd0);
            interruptTriggered = false;
        }
    }    return numKeysUpdated;
}private int updateSelectedKeys() {    int entries = pollWrapper.updated;    int numKeysUpdated = 0;    for (int i=0; i<entries; i++) {        // 獲取FD
        int nextFD = pollWrapper.getDescriptor(i);        // 根據(jù)FD找到對應(yīng)的SelectionKey
        SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));        // ski is null in the case of an interrupt
        if (ski != null) {            // 找到該FD的READY事件
            int rOps = pollWrapper.getEventOps(i);            if (selectedKeys.contains(ski)) {                // 將底層的事件轉(zhuǎn)換為Java封裝的事件,SelectionKey.OP_READ等
                if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
                    numKeysUpdated++;
                }
            } else {                // 沒有在原有的SelectedKey里面,說明是在等待過程中加入的
                ski.channel.translateAndSetReadyOps(rOps, ski);                if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {                    // 需要更新selectedKeys集合
                    selectedKeys.add(ski);
                    numKeysUpdated++;
                }
            }
        }
    }    // 返回Ready的Channel個數(shù)
    return numKeysUpdated;
}

EPollArrayWrapper

EpollArrayWrapper封裝了底層的調(diào)用,里面包含幾個native方法,如:

private native int epollCreate();private native void epollCtl(int epfd, int opcode, int fd, int events);private native int epollWait(long pollAddress, int numfds, long timeout,                             int epfd) throws IOException;

在openjdk的native目錄(native/sun/nio/ch)里面可以找到對應(yīng)的實現(xiàn)EPollArrayWrapper.c。
(這里順帶提一下,要實現(xiàn)native方法,可以在類里的方法加上native關(guān)鍵字,然后編譯成class文件,再轉(zhuǎn)換輸出.h,c/c++底層實現(xiàn)該頭文件的方法,編譯成so庫,放到對應(yīng)目錄即可)
在初始化文件方法里面,可以看到,是通過動態(tài)解析加載進來的,最終調(diào)用的epoll_create等方法。

JNIEXPORT void JNICALLJava_sun_nio_ch_EPollArrayWrapper_init(JNIEnv *env, jclass this){
    epoll_create_func = (epoll_create_t) dlsym(RTLD_DEFAULT, "epoll_create");
    epoll_ctl_func    = (epoll_ctl_t)    dlsym(RTLD_DEFAULT, "epoll_ctl");
    epoll_wait_func   = (epoll_wait_t)   dlsym(RTLD_DEFAULT, "epoll_wait");    if ((epoll_create_func == NULL) || (epoll_ctl_func == NULL) ||
        (epoll_wait_func == NULL)) {
        JNU_ThrowInternalError(env, "unable to get address of epoll functions, pre-2.6 kernel?");
    }
}

Channel和Buffer之間的堆外內(nèi)存

經(jīng)常會聽見別人說,堆外內(nèi)存容易泄漏,以及Netty框架里面采用了堆外內(nèi)存,減少拷貝提高性能。那么這里面的堆外內(nèi)存指的是什么?之前懷著一個好奇心,通過read方法,最后追蹤到SocketChannelImpl里面read方法,里面調(diào)用了IOUtil的read方法。里面會首先判斷傳入的Buffer是不是DirectBuffer,如果不是(則是HeapByteBuffer),則會創(chuàng)建一個臨時的DirectBuffer,然后再將其復(fù)制到堆內(nèi)。IOUtil.read方法:

static int read(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4, Object var5) throws IOException {    if(var1.isReadOnly()) {        throw new IllegalArgumentException("Read-only buffer");
    } else if(var1 instanceof DirectBuffer) {        // 為堆外內(nèi)存,則直接讀取
        return readIntoNativeBuffer(var0, var1, var2, var4, var5);
    } else {        // 為堆內(nèi)內(nèi)存,先獲取臨時堆外內(nèi)存
        ByteBuffer var6 = Util.getTemporaryDirectBuffer(var1.remaining());        int var8;        try {            // 讀取到堆外內(nèi)存
            int var7 = readIntoNativeBuffer(var0, var6, var2, var4, var5);
            var6.flip();            if(var7 > 0) {                // 復(fù)制到堆內(nèi)
                var1.put(var6);
            }

            var8 = var7;
        } finally {            // 釋放臨時堆外內(nèi)存
            Util.offerFirstTemporaryDirectBuffer(var6);
        }        return var8;
    }
}

這里有一個問題就是,為什么會需要DirectBuffer以及堆外內(nèi)存?通過對DirectByteBuffer的創(chuàng)建來分析,可以知道,通過unsafe.allocateMemory(size);來分配內(nèi)存的,而對于該方法來說,可以說是直接調(diào)用malloc返回,這一塊內(nèi)存是不受GC管理的,也就是所說的:堆外內(nèi)存容易泄漏。但是對于使用DirectByteBuffer來說,會創(chuàng)建一個Deallocator,注冊到Cleaner里面,當對象被回收的時候,則會被直接,從而釋放掉內(nèi)存,減少內(nèi)存泄漏。要用堆外內(nèi)存,從上面的創(chuàng)建來看,堆外內(nèi)存創(chuàng)建后,以long型地址保存的,而堆內(nèi)內(nèi)存會受到GC影響,對象會被移動,如果采用堆內(nèi)內(nèi)存,進行系統(tǒng)調(diào)用的時候,那么GC就需要停止,否則就會有問題,基于這一點,采用了堆外內(nèi)存(這一塊參考了R大的理解:https://www.zhihu.com/question/57374068)。

注:堆外內(nèi)存的創(chuàng)建(unsafe.cpp):

// 僅僅作了對齊以及將長度放在數(shù)組前方就返回了UNSAFE_ENTRY(jlong, Unsafe_AllocateMemory(JNIEnv *env, jobject unsafe, jlong size))
  UnsafeWrapper("Unsafe_AllocateMemory");  size_t sz = (size_t)size;  if (sz != (julong)size || size < 0) {
    THROW_0(vmSymbols::java_lang_IllegalArgumentException());
  }  if (sz == 0) {    return 0;
  }
  sz = round_to(sz, HeapWordSize);  void* x = os::malloc(sz);  if (x == NULL) {
    THROW_0(vmSymbols::java_lang_OutOfMemoryError());
  }  //Copy::fill_to_words((HeapWord*)x, sz / HeapWordSize);
  return addr_to_java(x);
UNSAFE_END

http://www.cnblogs.com/jabnih/p/7076465.html