转载

Java阻塞IO与非阻塞IO

JAVA SpringMVC+mybatis(oracle 和 mysql) HTML5 全新高大尚后台框架 bootstrap

IO:IO 是主存和外部设备 ( 硬盘、终端和网络等 ) 拷贝数据的过程。 IO 是操作系统的底层功能实现,底层通过 I/O 指令进行完成。

阻塞与非阻塞:一辆从 A 开往 B 的公共汽车上,路上有很多点可能会有人下车。司机不知道哪些点会有哪些人会下车,对于需要下车的人,如何处理更好?

  1. 司机过程中定时询问每个乘客是否到达目的地,若有人说到了,那么司机停车,乘客下车。 ( 类似阻塞式 )
  2. 每个人告诉售票员自己的目的地,然后睡觉,司机只和售票员交互,到了某个点由售票员通知乘客下车。 ( 类似非阻塞 )
    很显然,每个人要到达某个目的地可以认为是一个线程,司机可以认为是 CPU 。在阻塞式里面,每个线程需要不断的轮询,上下文切换,以达到找到目的地的结果。而在非阻塞方式里,每个乘客 ( 线程 ) 都在睡觉 ( 休眠 ) ,只在真正外部环境准备好了才唤醒,这样的唤醒肯定不会阻塞。

阻塞式I/O:(传统的IO)

以网络应用为例,在传统IO方式(阻塞IO)中需要监听一个ServerSocket,接受请求的连接为其提供服务(服务通常包括了处理请求并发送响应)下图是服务器的生命周期图,其中标有粗黑线条的部分表明会发生I/O阻塞。

Java阻塞IO与非阻塞IO

此方式在遇到多请求时,只能等待前面的请求完成后才能处理新的请求,所以通常在Java中处理阻塞I/O要用到线程(大量的线程)。代码如下

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

public class TCPServer {

public static void main(String[] args) {

try {

ServerSocket ss = new ServerSocket( );

System.out.println( "server start..." );

while ( true ) {

Socket s = ss.accept();

new LogicThread(s); //开一个线程来处理请求,这里面调用InputStream.read()读取请求信息

}

} catch (Exception e) {

e.printStackTrace();

}

}

}

可以分析创建服务器的每个具体步骤。首先创建ServerSocket

?
1

ServerSocket server= new ServerSocket( );

然后接受新的连接请求

?
1

Socket newConnection=server.accept(); //阻塞

在LogicThread中处理请求

?
1
2
3
4
5
6
7
8
InputStream in = newConnection.getInputStream();

InputStreamReader reader = new InputStreamReader(in);

BufferedReader buffer = new BufferedReader(reader);

Request request = new Request();

while (!request.isComplete()) {

String line = buffer.readLine(); //阻塞

request.addLine(line);

}

生命周期如下图所示:

Java阻塞IO与非阻塞IO

传统IO方式(阻塞I/O)在调用InputStream.read()/buffer.readLine()方法时是阻塞的,它会一直等到数据到来或缓冲区已满时或超时时才会返回,并且产生了大量String类型垃圾,尽管可以使用StringBuffer优化;同样,在调用ServerSocket.accept()方法时,也会一直阻塞到有客户端连接才会返回,每个客户端连接过来后,服务端都会启动一个线程去处理该客户端的请求。并且多线程处理多个连接。每个线程拥有自己的栈空间并且占用一些 CPU 时间。每个线程遇到外部未准备好的时候,都会阻塞掉。阻塞的结果就是会带来大量的进程上下文切换。且大部分进程上下文切换可能是无意义的。比如假设一个线程监听某一个端口,一天只会有几次请求进来,但是该 cpu 不得不为该线程不断做上下文切换尝试,大部分的切换以阻塞告终。

非阻塞式I/O(NIO):也可以说成“New I/O”

核心类:

1.Buffer 为所有的原始类型提供 (Buffer) 缓存支持。

2.Charset 字符集编码解码解决方案

3.Channel 一个新的原始 I/O 抽象,用于读写Buffer类型,通道可以认为是一种连接,可以是到特定设备,程序或者是网络的连接。通道的类等级结构图如下

Java阻塞IO与非阻塞IO

图中ReadableByteChannel和WritableByteChannel分别用于读写。

GatheringByteChannel可以从使用一次将多个Buffer中的数据写入通道,相反的,ScatteringByteChannel则可以一次将数据从通道读入多个Buffer中。你还可以设置通道使其为阻塞或非阻塞I/O操作服务。

为了使通道能够同传统I/O类相容,Channel类提供了静态方法创建Stream或Reader

4.Selector

在过去的阻塞I/O中,我们一般知道什么时候可以向stream中读或写,因为方法调用直到stream准备好时返回。但是使用非阻塞通道,我们需要一些方法来知道什么时候通道准备好了。在NIO包中,设计Selector就是为了这个目的。SelectableChannel可以注册特定的事件,而不是在事件发生时通知应用,通道跟踪事件。然后,当应用调用Selector上的任意一个selection方法时,它查看注册了的通道看是否有任何感兴趣的事件发生。

下面是java NIO的工作原理:

  1. 由一个专门的线程来处理所有的 IO 事件,并负责分发。
  2. 事件驱动机制:事件到的时候触发,而不是同步的去监视事件。
  3. 线程通讯:线程之间通过 wait,notify 等方式通讯。保证每次上下文切换都是有意义的。减少无谓的线程切换。
    如下图所示:
    Java阻塞IO与非阻塞IO
    (注:每个线程的处理流程大概都是读取数据、解码、计算处理、编码、发送响应。)

Java NIO的服务端只需启动一个专门的线程来处理所有的 IO 事件。java NIO采用了双向通道(channel)进行数据传输,而不是单向的流(stream),在通道上可以注册我们感兴趣的事件。一共有以下四种事件:

服务端接收客户端连接事件 SelectionKey.OP_ACCEPT(16)

客户端连接服务端事件 SelectionKey.OP_CONNECT(8)

读事件 SelectionKey.OP_READ(1)

写事件 SelectionKey.OP_WRITE(4)

服务端和客户端各自维护一个管理通道的对象,我们称之为selector,该对象能检测一个或多个通道 (channel) 上的事件。我们以服务端为例,如果服务端的selector上注册了读事件,某时刻客户端给服务端发送了一些数据,阻塞I/O这时会调用read()方法阻塞地读取数据,而NIO的服务端会在selector中添加一个读事件。服务端的处理线程会轮询地访问selector,如果访问selector时发现有感兴趣的事件到达,则处理这些事件,如果没有感兴趣的事件到达,则处理线程会一直阻塞直到感兴趣的事件到达为止。

Java阻塞IO与非阻塞IO

完整的非阻塞IO实例

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
/**

* NIO服务端

*/

public class NIOServer {

//通道管理器

private Selector selector;

/**

* 获得一个ServerSocket通道,并对该通道做一些初始化的工作

* @param port 绑定的端口号

* @throws IOException

*/

public void initServer( int port) throws IOException {

// 获得一个ServerSocket通道

ServerSocketChannel serverChannel = ServerSocketChannel.open();

// 设置通道为非阻塞

serverChannel.configureBlocking( false );

// 将该通道对应的ServerSocket绑定到port端口

serverChannel.socket().bind( new InetSocketAddress(port));

// 获得一个通道管理器

this .selector = Selector.open();

//将通道管理器和该通道绑定,并为该通道注册SelectionKey.OP_ACCEPT事件,注册该事件后,

//当该事件到达时,selector.select()会返回,如果该事件没到达selector.select()会一直阻塞。

serverChannel.register(selector, SelectionKey.OP_ACCEPT);

}

/**

* 采用轮询的方式监听selector上是否有需要处理的事件,如果有,则进行处理

* @throws IOException

*/

@SuppressWarnings ( "unchecked" )

public void listen() throws IOException {

System.out.println( "服务端启动成功!" );

// 轮询访问selector

while ( true ) {

//当注册的事件到达时,方法返回;否则,该方法会一直阻塞

selector.select();

// 获得selector中选中的项的迭代器,选中的项为注册的事件

Iterator ite = this .selector.selectedKeys().iterator();

while (ite.hasNext()) {

SelectionKey key = (SelectionKey) ite.next();

// 删除已选的key,以防重复处理

ite.remove();

// 客户端请求连接事件

if (key.isAcceptable()) {

ServerSocketChannel server = (ServerSocketChannel) key

.channel();

// 获得和客户端连接的通道

SocketChannel channel = server.accept();

// 设置成非阻塞

channel.configureBlocking( false );

//在这里可以给客户端发送信息哦

channel.write(ByteBuffer.wrap( new String( "向客户端发送了一条信息" ).getBytes()));

//在和客户端连接成功之后,为了可以接收到客户端的信息,需要给通道设置读的权限。

channel.register( this .selector, SelectionKey.OP_READ);

// 获得了可读的事件

} else if (key.isReadable()) {

read(key);

}

}

}

}

/**

* 处理读取客户端发来的信息 的事件

* @param key

* @throws IOException

*/

public void read(SelectionKey key) throws IOException{

// 服务器可读取消息:得到事件发生的Socket通道

SocketChannel channel = (SocketChannel) key.channel();

// 创建读取的缓冲区

ByteBuffer buffer = ByteBuffer.allocate( );

channel.read(buffer);

byte [] data = buffer.array();

String msg = new String(data).trim();

System.out.println( "服务端收到信息:" +msg);

ByteBuffer outBuffer = ByteBuffer.wrap(msg.getBytes());

channel.write(outBuffer); // 将消息回送给客户端

}

/**

* 启动服务端测试

* @throws IOException

*/

public static void main(String[] args) throws IOException {

NIOServer server = new NIOServer();

server.initServer( );

server.listen();

}

} /**

* NIO客户端

*/

public class NIOClient {

//通道管理器

private Selector selector;

/**

* 获得一个Socket通道,并对该通道做一些初始化的工作

* @param ip 连接的服务器的ip

* @param port 连接的服务器的端口号

* @throws IOException

*/

public void initClient(String ip, int port) throws IOException {

// 获得一个Socket通道

SocketChannel channel = SocketChannel.open();

// 设置通道为非阻塞

channel.configureBlocking( false );

// 获得一个通道管理器

this .selector = Selector.open();

// 客户端连接服务器,其实方法执行并没有实现连接,需要在listen()方法中调

//用channel.finishConnect();才能完成连接

channel.connect( new InetSocketAddress(ip,port));

//将通道管理器和该通道绑定,并为该通道注册SelectionKey.OP_CONNECT事件。

channel.register(selector, SelectionKey.OP_CONNECT);

}

/**

* 采用轮询的方式监听selector上是否有需要处理的事件,如果有,则进行处理

* @throws IOException

*/

@SuppressWarnings ( "unchecked" )

public void listen() throws IOException {

// 轮询访问selector

while ( true ) {

selector.select();

// 获得selector中选中的项的迭代器

Iterator ite = this .selector.selectedKeys().iterator();

while (ite.hasNext()) {

SelectionKey key = (SelectionKey) ite.next();

// 删除已选的key,以防重复处理

ite.remove();

// 连接事件发生

if (key.isConnectable()) {

SocketChannel channel = (SocketChannel) key

.channel();

// 如果正在连接,则完成连接

if (channel.isConnectionPending()){

channel.finishConnect();

}

// 设置成非阻塞

channel.configureBlocking( false );

//在这里可以给服务端发送信息哦

channel.write(ByteBuffer.wrap( new String( "向服务端发送了一条信息" ).getBytes()));

//在和服务端连接成功之后,为了可以接收到服务端的信息,需要给通道设置读的权限。

channel.register( this .selector, SelectionKey.OP_READ);

// 获得了可读的事件

} else if (key.isReadable()) {

read(key);

}

}

}

}

/**

* 处理读取服务端发来的信息 的事件

* @param key

* @throws IOException

*/

public void read(SelectionKey key) throws IOException{

// 服务器可读取消息:得到事件发生的Socket通道

SocketChannel channel = (SocketChannel) key.channel();

// 创建读取的缓冲区

ByteBuffer buffer = ByteBuffer.allocate( );

channel.read(buffer);

byte [] data = buffer.array();

String msg = new String(data).trim();

System.out.println( "客户端收到信息:" +msg);

}

/**

* 启动客户端测试

* @throws IOException

*/

public static void main(String[] args) throws IOException {

NIOClient client = new NIOClient();

client.initClient( "localhost" , );

client.listen();

}

}
正文到此结束
Loading...