作为后台开发,高并发和高性能问题,是最常需要面对的。而解决高并发和高性能问题,选择合适的I/O模型是必由之路。本文将对各种I/O模型作基本介绍,并以相应的java代码实现为例。

在介绍I/O模型之前,我们需要对操作系统的I/O操作流程有一个基本的了解,继而理解同步\异步、阻塞\非阻塞这两个关于I/O操作的不同概念。对于操作系统I/O操作流程,在这不作详细介绍,具体可以参考《深入理解计算机系统》一书,概要说明以下几点:

  • 操作系统的运行区间分为用户态和内核态,用户态即用户应用程序运行的运行空间,只能访问有限的内存,不能访问外围I/O设备,如硬盘、网卡等,内核态可以访问所有内存区域以及所有外围I/O设备

  • 用户态的应用程序访问I/O需要通过发起系统调用,由内核线程(指令)来完成

  • 内核线程完成相应I/O操作(读取/写入),数据需要从内核态复制到用户空间的内存,应用程序从内存获取数据,才能继续完成相应业务逻辑的执行。

同步\异步、阻塞\非阻塞这两个概念经常拿在一起说,容易导致混淆,若不能清晰地理解这两个概念,理解I/O模型也会变得困难。

  • 同步:指用户线程发起了I/O请求后,需一直等待或轮询内核I/O操作完成后,才会继续执行

  • 异步:指用户线程发起I/O请求后依然继续执行,当内核完成I/O操作后会通知用户线程,或者回调用户线程已注册的回调函数。

  • 阻塞:指只有内核的I/O操作彻底完成后,才会返回用户空间。

  • 非阻塞:指I/O操作被调用后,立即返回一个状态值,无需等到I/O操作彻底完成。

概念上的表述,理解起来可能不太直观,我们以以下的例子来类比说明:

周末晚上,我去楼下的快餐点打包外卖,点完餐后,老板给了我一张纸质单,让我等会凭这个单子来取外卖,由于我并不知道什么时候外卖能准备后,只能每隔几分钟去前台问一下是否可以了。在等外卖的过程中,我又想去隔壁的甜品店打包一个甜品,点完甜品后,甜品店给了我一个蓝牙电子通知器,在一定范围里,当甜品准备好了,电子通知器会振动,告知我去取甜品即可。所以最后的结果就是,我在快餐店不断询问快餐是否准备好了,同时等着甜品店的电子通知器振动。

在这个故事里,快餐店接受我的点餐请求后,我并不需要一直等待,还可以去隔壁点甜点,这是一种非阻塞模型,但是我还是需要不断去轮询餐点准备状态,这又是一种同步模型。而甜品店采取主动通知的方式,我并不需要去询问甜品的就绪状态,只需要等待甜品店的主动通知,这是一种典型的异步方式。

也由此可知,阻塞\非阻塞模型其实都是同步的,而只有采取特需的通知(回调)方式,才能真正实现异步的效果。 I/O模型 接下来我们基于同步\异步、阻塞\非阻塞的概念,介绍各个I/O模型,并以一个简单的进行socket读写程序的java代码来说明。

同步阻塞I/O

概念

同步阻塞I/O是编程方式最为简单的I/O操作,它是指在用户线程发起I/O操作后,会一直阻塞,直至内核完成I/O操作,将数据复制到用户空间下后才返回继续执行。

编程示例

public class SyncBlockedEcho {
    public static void main(String[] args){
        BufferedReader in=null;
        Socket socket=null;
        ServerSocket serverSocket=null;
        try {
            serverSocket =  new ServerSocket(8080);
            // 当前用户线程获取socket
            socket = serverSocket.accept();
            in = new BufferedReader(
                    new InputStreamReader(socket.getInputStream()));
            String inputLine;
            //阻塞直读取完成
            while ((inputLine = in.readLine()) != null) {
                System.out.println(inputLine);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            //do close
        }
    }
}

同步阻塞I/O作为最简单的一种I/O模型,其最大问题是用户线程在整个I/O操作过程中是被阻塞的,也就是说在I/O操作过程中,用户线程不能做任何事情,而I/O操作并不需要过多CPU参与,也就以为着这种模式对CPU的利用率明显不高。同时当前用户线程由于被阻塞,并不能继续接受新的socket连接,不适合高并发的场景,鉴于此,我们可以采取多线程的模式作为改善。

public class SyncBlockedThreadPoolEcho {
    public static void main(String[] args){
        ServerSocket serverSocket=null;
        ExecutorService pool = Executors.newFixedThreadPool(5);
        try {
            serverSocket =  new ServerSocket(8080);
            // 当前用户线程获取socket
            while (true) {
               final Socket  socket = serverSocket.accept();
                pool.submit(new Thread(new Runnable() {
                    public void run() {
                        try {
                            BufferedReader in = new BufferedReader(
                                    new InputStreamReader(socket.getInputStream()));
                            String inputLine;
                            while ((inputLine = in.readLine()) != null) {
                                System.out.println(inputLine);
                            }
                        }catch (IOException ex){
                            ex.printStackTrace();
                        }finally {
                            //do close
                        }
                    }
                }));
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            //do close
        }
    }
}

采用多线程的模式,解决了主线程因为被阻塞而不能接受新连接的问题,同时,由于可以采取线程池之类的技术,避免线程的重复创建、销毁等带来的性能损失,所以具有了更高的并发处理能力。但是这种模式依然受限于I/O操作的同步与阻塞,面对大并发的场景,需要大量的线程来维持连接,而线程越多,会引起内存占用(每个线程都有最小的内存分配要求)、CPU利用率下降(线程之间切换频繁)的问题。

##同步非阻塞I/O

概念

同步非阻塞I/O是指请求发起后,当前用户线程发起I/O请求后,不会在调用上阻塞,而是立即返回,一般可以通过将socket设置为NONBLOCK的方式来达到此目的。但是由于需要不断地轮询socket的就绪状态,CPU将会浪费在状态轮询上,直到数据就绪,都是无用功。也就是说,当前线程会阻塞在如下的while循环上,实际上起不到真正的非阻塞效果。

while(socket.read(buf)){
  process(buf);
}

多路复用

为了解决这个问题,于是有了I/O多路复用。I/O多路复用建立在操作系统内核提供的select系统调用基础之上,当然他还是需要轮询,只是轮询的对象从socket的就绪状态变为了select系统调用的状态返回,但是他的优势是因为select可以同时返回多个就绪状态的socket,也就是意味着在同一个用户线程,可以同时进行多个I/O请求,而不是上述模型的在单个I/O请求上阻塞,这也是模型概念里“多路”的含义了。我们用以下代码来表述这一特性:

public class NonBlcokedEcho {
    public static void main(String[] args){
        BufferedReader in=null;
        Socket socket=null;
        ServerSocket serverSocket=null;
        ServerSocketChannel serverChannel = null  ;
        Selector selector = null;
        try {
            try {
                serverChannel = ServerSocketChannel.open();
                ServerSocket innerSocket = serverChannel.socket();
                InetSocketAddress address = new InetSocketAddress(8080);
                innerSocket.bind(address);
                //设置socket为非阻塞
                serverChannel.configureBlocking(false);
                selector = Selector.open();
                serverChannel.register(selector, SelectionKey.OP_ACCEPT);
            }catch (IOException ex){
                ex.printStackTrace();
            }
            while (true) {
                try {
                    //发起select调用
                    selector.select();
                } catch (IOException ex) {
                    ex.printStackTrace();
                }
                //返回多个就绪状态的socket
                Set<SelectionKey> keys = selector.selectedKeys();
                Iterator iterator = keys.iterator() ;
                //循环处理多个就绪socket
                while (iterator.hasNext()){
                    SelectionKey key=(SelectionKey)iterator.next();
                    iterator.remove();
                    if(key.isAcceptable()){
                        //接受新连接
                    }else if(key.isReadable()){
                        //读取数据
                    }
                    else if(key.isWritable()){
                        //写入数据
                    }
                }
            }

        }catch (Exception ex){
            ex.printStackTrace();
        }finally {
            //do close
        }
    }
}

异步I/O

前面说到的I/O模型,无论是阻塞还是非阻塞模型,都是同步的,而真正的异步I/O需要借助以下特殊的系统API来实现,这其中包括Linux下的AIO,windows/.NET下的BeginInvoke/EndInvoke编程模型等等,Java SE 7后也引入NIO的支持。其基本思想是通过告知内核一些上下文状态信息,注册回调函数,当内核完成I/O操作,并将数据复制到缓冲区后,通过唤起回调函数,达到通知应用程序使用数据的效果,由此可见,异步I/O肯定也是非阻塞的。以下以JAVA NIO的代码为例:

public class AsyncEcho {
    public static int DEFAULT_PORT = 7;
    public static void main(String[] args) throws IOException {

        ExecutorService taskExecutor = Executors.newCachedThreadPool(Executors.defaultThreadFactory());
        try  {
            AsynchronousServerSocketChannel asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open();
            if (asynchronousServerSocketChannel.isOpen()) {
                asynchronousServerSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, 4 * 1024);
                asynchronousServerSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
                asynchronousServerSocketChannel.bind(new InetSocketAddress(8080));
                System.out.println("Waiting for connections ...");
                while (true) {
                    Future<AsynchronousSocketChannel> asynchronousSocketChannelFuture = asynchronousServerSocketChannel
                            .accept();
                    try {
                        final AsynchronousSocketChannel asynchronousSocketChannel = asynchronousSocketChannelFuture
                                .get();
                        Callable<String> worker = new Callable<String>() {
                            public String call() throws Exception {
                                String host = asynchronousSocketChannel.getRemoteAddress().toString();
                                System.out.println("Incoming connection from: " + host);
                                final ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
                                // transmitting data
                                while (asynchronousSocketChannel.read(buffer).get() != -1) {
                                    buffer.flip();
                                    asynchronousSocketChannel.write(buffer).get();
                                    if (buffer.hasRemaining()) {
                                        buffer.compact();
                                    } else {
                                        buffer.clear();
                                    }
                                }
                                asynchronousSocketChannel.close();
                                System.out.println(host + " was successfully served!");
                                return host;
                            }
                        };
                        taskExecutor.submit(worker);
                    } catch (Exception ex) {
                       ex.printStackTrace();
                        break;
                    }
                }
            } else {
                System.out.println("The asynchronous server-socket channel cannot be opened!");
            }
        } catch (IOException ex) {
            System.err.println(ex);
        }
    }
}

总结:

  • 同步/异步与阻塞/非阻塞是两个不同的概念。

  • 阻塞/非阻塞I/O一般都是同步的,而异步I/O一般都是非阻塞的。

  • 同步I/O要达到并发的效果,一般采用多路复用模型(select/epoll)。

  • 异步I/O需要系统内核的特别支持,提供异步操作的API。