Java NIO中涉及的基础内容:

1 基本概念

  • Channel 通道

Channel 通道,类似于流一个channel可以和文件或者网络socket对应,以对应socket为例,往Channel中写数据等同于往Socket中写入数据。

  • Buffer 缓冲区

类似于一块内存区域或者字节数组,在NIO的使用中只有将数据打包成Buffer形式才能往Channel写入或者读取数据。

  • Selector 选择器

Selector是一个Channel管理器,可以由一个线程管理,可以管理多个Channel对象,也就是对应的网络socket。

2 方法原理
SelectableChannel 是一个Channel的类实现,表示被选择通道,任何一个SelectableChannel都可以将自己注册到Selector中,这个Channel就能被Selector管理,与客户端连接的数据没有准备好时候,Selector会处于等待状态,当SelectableChannel的数据准备好时,Selector就会接到通知,得到那些已经准备好的数据。

package testnio;

import android.text.Selection;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class NIOEchoServer {
    private Selector selector;
    private ExecutorService threadPool = Executors.newCachedThreadPool();

    public static Map<Socket, Long> time_stat = new HashMap<Socket, Long>(10240);

    public void startServer() throws IOException {
        //获取selector实例
        selector = SelectorProvider.provider().openSelector();
        ServerSocketChannel ssc = ServerSocketChannel.open();

        //channel设置为非阻塞式的
        ssc.configureBlocking(false);

        InetSocketAddress isa = new InetSocketAddress(InetAddress.getLocalHost(), 8002);

        //将channel绑定在8002端口
        ssc.socket().bind(isa);

        //将ServerSocketChannel绑定在Selector上,并注册它感兴趣的事件为Accpet(新的客户端连接过来)。
        //当Selector发现ServerSocketChannel有新的客户端连接时,就会通知ServerSocketChannel进行处理,
        //register()返回的是SelectionKey,表示一对Selector和Channel的关系
        SelectionKey acceptKey = ssc.register(selector, SelectionKey.OP_ACCEPT);

        for (;;){
            selector.select();

            Set readyKeys = selector.selectedKeys();
            Iterator i = readyKeys.iterator();
            long doEndTimestamp = 0;
            while (i.hasNext()){
                SelectionKey sk = (SelectionKey) i.next();
                i.remove();

                if (sk.isAcceptable()){
                    //准备连接
                    doAccept(sk);
                }
                else if (sk.isValid() && sk.isReadable()){
                    Socket skSocket = ((SocketChannel)sk.channel()).socket();
                    if (!time_stat.containsKey(skSocket)){
                        time_stat.put(skSocket, System.currentTimeMillis());
                    }
                    //读取数据
                    doRead(sk);
                }
                else if(sk.isValid() && sk.isWritable()){
                    //写数据
                    doWrite(sk);

                    //计算读取业务时间
                    doEndTimestamp = System.currentTimeMillis();
                    Socket skSocket = ((SocketChannel)sk.channel()).socket();
                    long doStartTimestamp = time_stat.remove(skSocket);
                    System.out.println("NIO Server spend: " + (doEndTimestamp - doStartTimestamp) + "ms");
                }
            }

        }
    }

    private void doWrite(SelectionKey sk) {
        SocketChannel channel = (SocketChannel) sk.channel();
        EchoClientList echoClientList = (EchoClientList) sk.attachment();
        LinkedList<ByteBuffer> outq = echoClientList.getOutputQueue();

        ByteBuffer bb = outq.getLast();
        try {
            int len = channel.write(bb);
            if (len < 0) {
                disconnect(sk);
                return;
            }

            if (bb.remaining() == 0){
                outq.removeLast();
            }

        } catch (IOException e) {
            System.out.println("Failed to write to client");
            e.printStackTrace();
            disconnect(sk);
        }

        if (outq.size() == 0){
            sk.interestOps(SelectionKey.OP_READ);
        }
    }

    /**
     * 连接处理函数
     * @param sk
     */
    private void doAccept(SelectionKey sk){
        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) sk.channel();
        SocketChannel clientChannel;

        try {
            clientChannel = serverSocketChannel.accept();
            clientChannel.configureBlocking(false);

            //将clientChannel注册到Selector上,并且对read操作感兴趣
            SelectionKey clientKey = clientChannel.register(selector, SelectionKey.OP_READ);

            EchoClientList echoClient = new EchoClientList();
            clientKey.attach(echoClient);

            InetAddress clientAddress = clientChannel.socket().getInetAddress();
            System.out.println("Accept connection from " + clientAddress.getHostAddress());


        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private void doRead(SelectionKey sk){
        SocketChannel channel = (SocketChannel) sk.channel();
        ByteBuffer bb = ByteBuffer.allocate(8192);
        int len;

        try {
            len = channel.read(bb);
            if (len < 0){
                disconnect(sk);
                return;
            }
        } catch (IOException e) {
            System.out.println("Failed to read from client.");
            e.printStackTrace();
            disconnect(sk);
            return;
        }

        bb.flip();
        threadPool.execute(new HandleMessage(sk, bb));
    }

    private void disconnect(SelectionKey sk) {

    }

    class HandleMessage implements Runnable {
        SelectionKey sk;
        ByteBuffer bb;
        public HandleMessage(SelectionKey sk, ByteBuffer bb) {
            this.sk = sk;
            this.bb = bb;
        }

        @Override
        public void run() {
            EchoClientList echoClientList = (EchoClientList) sk.attachment();
            echoClientList.enqueue(bb);
            sk.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
            //强制Selector立即返回
            selector.wakeup();
        }
    }
}

Tags: Java, 网络编程, NIO

Related Posts:

Leave a Comment