作为后台开发,高并发和高性能问题,是最常需要面对的。而解决高并发和高性能问题,选择合适的I/O模型是必由之路。本文将对各种I/O模型作基本介绍,并以相应的java代码实现为例。

在介绍I/O模型之前,我们需要对操作系统的I/O操作流程有一个基本的了解,继而理解同步\异步、阻塞\非阻塞这两个关于I/O操作的不同概念。对于操作系统I/O操作流程,在这不作详细介绍,具体可以参考《深入理解计算机系统》一书,概要说明以下几点:

  • 操作系统的运行区间分为用户态和内核态,用户态即用户应用程序运行的运行空间,只能访问有限的内存,不能访问外围I/O设备,如硬盘、网卡等,内核态可以访问所有内存区域以及所有外围I/O设备

  • 用户态的应用程序访问I/O需要通过发起系统调用,由内核线程(指令)来完成

  • 内核线程完成相应I/O操作(读取/写入),数据需要从内核态复制到用户空间的内存,应用程序从内存获取数据,才能继续完成相应业务逻辑的执行。

同步\异步、阻塞\非阻塞这两个概念经常拿在一起说,容易导致混淆,若不能清晰地理解这两个概念,理解I/O模型也会变得困难。

  • 同步:指用户线程发起了I/O请求后,需一直等待或轮询内核I/O操作完成后,才会继续执行

  • 异步:指用户线程发起I/O请求后依然继续执行,当内核完成I/O操作后会通知用户线程,或者回调用户线程已注册的回调函数。

  • 阻塞:指只有内核的I/O操作彻底完成后,才会返回用户空间。

  • 非阻塞:指I/O操作被调用后,立即返回一个状态值,无需等到I/O操作彻底完成。

概念上的表述,理解起来可能不太直观,我们以以下的例子来类比说明:

周末晚上,我去楼下的快餐点打包外卖,点完餐后,老板给了我一张纸质单,让我等会凭这个单子来取外卖,由于我并不知道什么时候外卖能准备后,只能每隔几分钟去前台问一下是否可以了。在等外卖的过程中,我又想去隔壁的甜品店打包一个甜品,点完甜品后,甜品店给了我一个蓝牙电子通知器,在一定范围里,当甜品准备好了,电子通知器会振动,告知我去取甜品即可。所以最后的结果就是,我在快餐店不断询问快餐是否准备好了,同时等着甜品店的电子通知器振动。

在这个故事里,快餐店接受我的点餐请求后,我并不需要一直等待,还可以去隔壁点甜点,这是一种非阻塞模型,但是我还是需要不断去轮询餐点准备状态,这又是一种同步模型。而甜品店采取主动通知的方式,我并不需要去询问甜品的就绪状态,只需要等待甜品店的主动通知,这是一种典型的异步方式。

也由此可知,阻塞\非阻塞模型其实都是同步的,而只有采取特需的通知(回调)方式,才能真正实现异步的效果。
I/O模型
接下来我们基于同步\异步、阻塞\非阻塞的概念,介绍各个I/O模型,并以一个简单的进行socket读写程序的java代码来说明。

同步阻塞I/O

概念

同步阻塞I/O是编程方式最为简单的I/O操作,它是指在用户线程发起I/O操作后,会一直阻塞,直至内核完成I/O操作,将数据复制到用户空间下后才返回继续执行。

编程示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class SyncBlockedEcho {
public static void main(String[] args){
BufferedReader in=null;
Socket socket=null;
ServerSocket serverSocket=null;
try {
serverSocket = new ServerSocket(8080);
// 当前用户线程获取socket
socket = serverSocket.accept();
in = new BufferedReader(
new InputStreamReader(socket.getInputStream()));
String inputLine;
//阻塞直读取完成
while ((inputLine = in.readLine()) != null) {
System.out.println(inputLine);
}
} catch (IOException e) {
e.printStackTrace();
}finally {
//do close
}
}
}

同步阻塞I/O作为最简单的一种I/O模型,其最大问题是用户线程在整个I/O操作过程中是被阻塞的,也就是说在I/O操作过程中,用户线程不能做任何事情,而I/O操作并不需要过多CPU参与,也就以为着这种模式对CPU的利用率明显不高。同时当前用户线程由于被阻塞,并不能继续接受新的socket连接,不适合高并发的场景,鉴于此,我们可以采取多线程的模式作为改善。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class SyncBlockedThreadPoolEcho {
public static void main(String[] args){
ServerSocket serverSocket=null;
ExecutorService pool = Executors.newFixedThreadPool(5);
try {
serverSocket = new ServerSocket(8080);
// 当前用户线程获取socket
while (true) {
final Socket socket = serverSocket.accept();
pool.submit(new Thread(new Runnable() {
public void run() {
try {
BufferedReader in = new BufferedReader(
new InputStreamReader(socket.getInputStream()));
String inputLine;
while ((inputLine = in.readLine()) != null) {
System.out.println(inputLine);
}
}catch (IOException ex){
ex.printStackTrace();
}finally {
//do close
}
}
}));
}
} catch (IOException e) {
e.printStackTrace();
}finally {
//do close
}
}
}

采用多线程的模式,解决了主线程因为被阻塞而不能接受新连接的问题,同时,由于可以采取线程池之类的技术,避免线程的重复创建、销毁等带来的性能损失,所以具有了更高的并发处理能力。但是这种模式依然受限于I/O操作的同步与阻塞,面对大并发的场景,需要大量的线程来维持连接,而线程越多,会引起内存占用(每个线程都有最小的内存分配要求)、CPU利用率下降(线程之间切换频繁)的问题。

同步非阻塞I/O

概念

同步非阻塞I/O是指请求发起后,当前用户线程发起I/O请求后,不会在调用上阻塞,而是立即返回,一般可以通过将socket设置为NONBLOCK的方式来达到此目的。但是由于需要不断地轮询socket的就绪状态,CPU将会浪费在状态轮询上,直到数据就绪,都是无用功。也就是说,当前线程会阻塞在如下的while循环上,实际上起不到真正的非阻塞效果。

1
2
3
while(socket.read(buf)){
process(buf);
}

多路复用

为了解决这个问题,于是有了I/O多路复用。I/O多路复用建立在操作系统内核提供的select系统调用基础之上,当然他还是需要轮询,只是轮询的对象从socket的就绪状态变为了select系统调用的状态返回,但是他的优势是因为select可以同时返回多个就绪状态的socket,也就是意味着在同一个用户线程,可以同时进行多个I/O请求,而不是上述模型的在单个I/O请求上阻塞,这也是模型概念里“多路”的含义了。我们用以下代码来表述这一特性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class NonBlcokedEcho {
public static void main(String[] args){
BufferedReader in=null;
Socket socket=null;
ServerSocket serverSocket=null;
ServerSocketChannel serverChannel = null ;
Selector selector = null;
try {
try {
serverChannel = ServerSocketChannel.open();
ServerSocket innerSocket = serverChannel.socket();
InetSocketAddress address = new InetSocketAddress(8080);
innerSocket.bind(address);
//设置socket为非阻塞
serverChannel.configureBlocking(false);
selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
}catch (IOException ex){
ex.printStackTrace();
}
while (true) {
try {
//发起select调用
selector.select();
} catch (IOException ex) {
ex.printStackTrace();
}
//返回多个就绪状态的socket
Set<SelectionKey> keys = selector.selectedKeys();
Iterator iterator = keys.iterator() ;
//循环处理多个就绪socket
while (iterator.hasNext()){
SelectionKey key=(SelectionKey)iterator.next();
iterator.remove();
if(key.isAcceptable()){
//接受新连接
}else if(key.isReadable()){
//读取数据
}
else if(key.isWritable()){
//写入数据
}
}
}

}catch (Exception ex){
ex.printStackTrace();
}finally {
//do close
}
}
}

异步I/O

前面说到的I/O模型,无论是阻塞还是非阻塞模型,都是同步的,而真正的异步I/O需要借助以下特殊的系统API来实现,这其中包括Linux下的AIO,windows/.NET下的BeginInvoke/EndInvoke编程模型等等,Java SE 7后也引入NIO的支持。其基本思想是通过告知内核一些上下文状态信息,注册回调函数,当内核完成I/O操作,并将数据复制到缓冲区后,通过唤起回调函数,达到通知应用程序使用数据的效果,由此可见,异步I/O肯定也是非阻塞的。以下以JAVA NIO的代码为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class AsyncEcho {
public static int DEFAULT_PORT = 7;
public static void main(String[] args) throws IOException {

ExecutorService taskExecutor = Executors.newCachedThreadPool(Executors.defaultThreadFactory());
try {
AsynchronousServerSocketChannel asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open();
if (asynchronousServerSocketChannel.isOpen()) {
asynchronousServerSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, 4 * 1024);
asynchronousServerSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
asynchronousServerSocketChannel.bind(new InetSocketAddress(8080));
System.out.println("Waiting for connections ...");
while (true) {
Future<AsynchronousSocketChannel> asynchronousSocketChannelFuture = asynchronousServerSocketChannel
.accept();
try {
final AsynchronousSocketChannel asynchronousSocketChannel = asynchronousSocketChannelFuture
.get();
Callable<String> worker = new Callable<String>() {
public String call() throws Exception {
String host = asynchronousSocketChannel.getRemoteAddress().toString();
System.out.println("Incoming connection from: " + host);
final ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
// transmitting data
while (asynchronousSocketChannel.read(buffer).get() != -1) {
buffer.flip();
asynchronousSocketChannel.write(buffer).get();
if (buffer.hasRemaining()) {
buffer.compact();
} else {
buffer.clear();
}
}
asynchronousSocketChannel.close();
System.out.println(host + " was successfully served!");
return host;
}
};
taskExecutor.submit(worker);
} catch (Exception ex) {
ex.printStackTrace();
break;
}
}
} else {
System.out.println("The asynchronous server-socket channel cannot be opened!");
}
} catch (IOException ex) {
System.err.println(ex);
}
}
}

总结:

  • 同步/异步与阻塞/非阻塞是两个不同的概念。

  • 阻塞/非阻塞I/O一般都是同步的,而异步I/O一般都是非阻塞的。

  • 同步I/O要达到并发的效果,一般采用多路复用模型(select/epoll)。

  • 异步I/O需要系统内核的特别支持,提供异步操作的API。