对于Java IO模型的变化,描述最为清楚的莫属于Doug Lea对Reactor模型的讲解 《Scalable IO in Java》 。本文则主要围绕该文档,对Java IO模型的演变过程进行讲解,并且会讲解各个模型所解决的问题以及其存在的问题。最后,本文也会以一个实际的例子来实现Reactor模型。
对于传统IO模型,其主要是一个Server对接N个客户端,在客户端连接之后,为每个客户端都分配一个执行线程。如下图是该模型的一个演示:
从图中可以看出,传统IO的特点在于:
这种设计模式在客户端连接不多,并发量不大的情况下是可以运行得很好的,但是在海量并发的情况下,这种模式就显得力不从心了。这种模式主要存在的问题有如下几点:
在传统IO模型中,由于线程在等待连接以及进行IO操作时都会阻塞当前线程,这部分损耗是非常大的。因而jdk 1.4中就提供了一套非阻塞IO的API。该API本质上是以事件驱动来处理网络事件的,而Reactor是基于该API提出的一套IO模型。如下是Reactor事件驱动模型的示意图:
从图中可以看出,在Reactor模型中,主要有四个角色:客户端连接,Reactor,Acceptor和Handler。这里Acceptor会不断地接收客户端的连接,然后将接收到的连接交由Reactor进行分发,最后有具体的Handler进行处理。改进后的Reactor模型相对于传统的IO模型主要有如下优点:
在上面的Reactor模型中,由于网络读写和业务操作都在同一个线程中,在高并发情况下,这里的系统瓶颈主要在两方面:
基于上述两个问题,这里在单线程Reactor模型的基础上提出了使用线程池的方式处理业务操作的模型。如下是该模型的示意图:
从图中可以看出,在多线程进行业务操作的模型下,该模式主要具有如下特点:
这种模式相较于前面的模式性能有了很大的提升,主要在于在进行网络读写的同时,也进行了业务计算,从而大大提升了系统的吞吐量。但是这种模式也有其不足,主要在于:
对于使用线程池处理业务操作的模型,由于网络读写在高并发情况下会成为系统的一个瓶颈,因而针对该模型这里提出了一种改进后的模型,即使用线程池进行网络读写,而仅仅只使用一个线程专门接收客户端连接。如下是该模型的示意图:
可以看到,改进后的Reactor模型将Reactor拆分为了mainReactor和subReactor。这里mainReactor主要进行客户端连接的处理,处理完成之后将该连接交由subReactor以处理客户端的网络读写。这里的subReactor则是使用一个线程池来支撑的,其读写能力将会随着线程数的增多而大大增加。对于业务操作,这里也是使用一个线程池,而每个业务请求都只需要进行编解码和业务计算。通过这种方式,服务器的性能将会大大提升,在可见情况下,其基本上可以支持百万连接。
对于上述Reactor模型,服务端主要有三个角色:Reactor,Acceptor和Handler。这里基于Doug Lea的文档对其进行了实现,如下是Reactor的实现代码:
public class Reactor implements Runnable { private final Selector selector; private final ServerSocketChannel serverSocket; public Reactor(int port) throws IOException { serverSocket = ServerSocketChannel.open(); // 创建服务端的ServerSocketChannel serverSocket.configureBlocking(false); // 设置为非阻塞模式 selector = Selector.open(); // 创建一个Selector多路复用器 SelectionKey key = serverSocket.register(selector, SelectionKey.OP_ACCEPT); serverSocket.bind(new InetSocketAddress(port)); // 绑定服务端端口 key.attach(new Acceptor(serverSocket)); // 为服务端Channel绑定一个Acceptor } @Override public void run() { try { while (!Thread.interrupted()) { selector.select(); // 服务端使用一个线程不断等待客户端的连接到达 Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> iterator = keys.iterator(); while (iterator.hasNext()) { dispatch(iterator.next()); // 监听到客户端连接事件后将其分发给Acceptor iterator.remove(); } selector.selectNow(); } } catch (IOException e) { e.printStackTrace(); } } private void dispatch(SelectionKey key) throws IOException { // 这里的attachement也即前面为服务端Channel绑定的Acceptor,调用其run()方法进行 // 客户端连接的获取,并且进行分发 Runnable attachment = (Runnable) key.attachment(); attachment.run(); } }
这里Reactor首先开启了一个ServerSocketChannel,然后将其绑定到指定的端口,并且注册到了一个多路复用器上。接着在一个线程中,其会在多路复用器上等待客户端连接。当有客户端连接到达后,Reactor就会将其派发给一个Acceptor,由该Acceptor专门进行客户端连接的获取。下面我们继续看一下Acceptor的代码:
public class Acceptor implements Runnable { private final ExecutorService executor = Executors.newFixedThreadPool(20); private final ServerSocketChannel serverSocket; public Acceptor(ServerSocketChannel serverSocket) { this.serverSocket = serverSocket; } @Override public void run() { try { SocketChannel channel = serverSocket.accept(); // 获取客户端连接 if (null != channel) { executor.execute(new Handler(channel)); // 将客户端连接交由线程池处理 } } catch (IOException e) { e.printStackTrace(); } } }
这里可以看到,在Acceptor获取到客户端连接之后,其就将其交由线程池进行网络读写了,而这里的主线程只是不断监听客户端连接事件。下面我们看看Handler的具体逻辑:
public class Handler implements Runnable { private volatile static Selector selector; private final SocketChannel channel; private SelectionKey key; private volatile ByteBuffer input = ByteBuffer.allocate(1024); private volatile ByteBuffer output = ByteBuffer.allocate(1024); public Handler(SocketChannel channel) throws IOException { this.channel = channel; channel.configureBlocking(false); // 设置客户端连接为非阻塞模式 selector = Selector.open(); // 为客户端创建一个新的多路复用器 key = channel.register(selector, SelectionKey.OP_READ); // 注册客户端Channel的读事件 } @Override public void run() { try { while (selector.isOpen() && channel.isOpen()) { Set<SelectionKey> keys = select(); // 等待客户端事件发生 Iterator<SelectionKey> iterator = keys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); // 如果当前是读事件,则读取数据 if (key.isReadable()) { read(key); } else if (key.isWritable()) { // 如果当前是写事件,则写入数据 write(key); } } } } catch (Exception e) { e.printStackTrace(); } } // 这里处理的主要目的是处理Jdk的一个bug,该bug会导致Selector被意外触发,但是实际上没有任何事件到达, // 此时的处理方式是新建一个Selector,然后重新将当前Channel注册到该Selector上 private Set<SelectionKey> select() throws IOException { selector.select(); Set<SelectionKey> keys = selector.selectedKeys(); if (keys.isEmpty()) { int interestOps = key.interestOps(); selector = Selector.open(); key = channel.register(selector, interestOps); return select(); } return keys; } // 读取客户端发送的数据 private void read(SelectionKey key) throws IOException { channel.read(input); if (input.position() == 0) { return; } input.flip(); process(); // 对读取的数据进行业务处理 input.clear(); key.interestOps(SelectionKey.OP_WRITE); // 读取完成后监听写入事件 } private void write(SelectionKey key) throws IOException { output.flip(); if (channel.isOpen()) { channel.write(output); // 当有写入事件时,将业务处理的结果写入到客户端Channel中 key.channel(); channel.close(); output.clear(); } } // 进行业务处理,并且获取处理结果。本质上,基于Reactor模型,如果这里成为处理瓶颈, // 则直接将其处理过程放入线程池即可,并且使用一个Future获取处理结果,最后写入客户端Channel private void process() { byte[] bytes = new byte[input.remaining()]; input.get(bytes); String message = new String(bytes, CharsetUtil.UTF_8); System.out.println("receive message from client: /n" + message); output.put("hello client".getBytes()); } }
在Handler中,主要进行的就是为每一个客户端Channel创建一个Selector,并且监听该Channel的网络读写事件。当有事件到达时,进行数据的读写,而业务操作这交由具体的业务线程池处理。
本文首先讲解了Java IO的几种网络模型,着重比较了每种不同的模型的优缺点,并且基于Reactor模型,使用一个实例对其实现过程进行了演示。