前言
上篇文章介绍了 NioEndpoint,其中讲到了在 NioEndpoint#startInternal 方法里创建并启动了 Acceptor 和 Poller,线程。本篇文章先看 Acceptor,下篇文章再看 Poller。
1. Acceptor
Acceptor 的构造方法声明为:
private final AbstractEndpoint<?,U> endpoint; public Acceptor(AbstractEndpoint<?,U> endpoint) { this.endpoint = endpoint; }
其中 endpoint 参数是在 NioEndpoint#startAcceptorThreads 方法里 new Acceptor 时传入的 NioEndpoint 对象。
Acceptor 实现了 Runnable 方法,因此它的 run 方法是 Acceptor 的关键。
@Override public void run() { int errorDelay = 0; // Loop until we receive a shutdown command while (endpoint.isRunning()) { // Loop if endpoint is paused while (endpoint.isPaused() && endpoint.isRunning()) { state = AcceptorState.PAUSED; try { Thread.sleep(50); } catch (InterruptedException e) { // Ignore } } if (!endpoint.isRunning()) { break; } state = AcceptorState.RUNNING; try { //if we have reached max connections, wait endpoint.countUpOrAwaitConnection(); // Endpoint might have been paused while waiting for latch // If that is the case, don't accept new connections if (endpoint.isPaused()) { continue; } U socket = null; try { // Accept the next incoming connection from the server // socket socket = endpoint.serverSocketAccept(); } catch (Exception ioe) { // We didn't get a socket endpoint.countDownConnection(); if (endpoint.isRunning()) { // Introduce delay if necessary errorDelay = handleExceptionWithDelay(errorDelay); // re-throw throw ioe; } else { break; } } // Successful accept, reset the error delay errorDelay = 0; // Configure the socket if (endpoint.isRunning() && !endpoint.isPaused()) { // setSocketOptions() will hand the socket off to // an appropriate processor if successful if (!endpoint.setSocketOptions(socket)) { endpoint.closeSocket(socket); } } else { endpoint.destroySocket(socket); } } catch (Throwable t) { ExceptionUtils.handleThrowable(t); String msg = sm.getString("endpoint.accept.fail"); // APR specific. // Could push this down but not sure it is worth the trouble. if (t instanceof Error) { Error e = (Error) t; if (e.getError() == 233) { // Not an error on HP-UX so log as a warning // so it can be filtered out on that platform // See bug 50273 log.warn(msg, t); } else { log.error(msg, t); } } else { log.error(msg, t); } } } state = AcceptorState.ENDED; }
run 方法的代码被包裹在一个 while 循环里,while 循环的判断条件是 endpoint.isRunning(),也就是 NioEndpoint 的父类 AbstractEndpoint 里的 running 字段。
/** * Running state of the endpoint. */ protected volatile boolean running = false;
这个 running 字段在 NioEndpoint#startInternal 方法里被置为 true。在 NioEndpoint#stopInternal 方法里 running 置为 false,
在最外层的 while 循环里,就是 run 方法的核心了。
// Loop if endpoint is paused while (endpoint.isPaused() && endpoint.isRunning()) { state = AcceptorState.PAUSED; try { Thread.sleep(50); } catch (InterruptedException e) { // Ignore } }
首先判断 endpoint.isPaused() 是不是为 true,如果是就让线程 sleep 50毫秒,并把 Acceptor 的状态设置为 AcceptorState.PAUSED。
这个 endpoint.isPaused() 跟 isRunning 方法类似,也就是判断
AbstractEndpoint 里的一个 paused 属性,起声明如下
/** * Will be set to true whenever the endpoint is paused. */ protected volatile boolean paused = false;
这个 pause 的是在 AbstractEndpoint#pause 里置为 true 的。
然后把 Acceptor 的状态改为 AcceptorState.RUNNING。
接着进入 try 语句块。首先调用 endpoint.countUpOrAwaitConnection()
//if we have reached max connections, wait endpoint.countUpOrAwaitConnection();
protected void countUpOrAwaitConnection() throws InterruptedException { if (maxConnections==-1) return; LimitLatch latch = connectionLimitLatch; if (latch!=null) latch.countUpOrAwait(); }
可以看出,countUpOrAwaitConnection 这个方法是判断是否已超过 maxConnections,如果是就调用 latch.countUpOrAwait() 等待。
然后调用 endpoint.serverSocketAccept() 方法,返回一个泛型对象,这个泛型对象的具体类型在 NioEndpoint 对象中就确立了。
public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
public abstract class AbstractJsseEndpoint<S,U> extends AbstractEndpoint<S,U>
从 NioEndpoint 和 AbstractJsseEndpoint 的声明中可以看出,泛型 U 的具体类型是 SocketChannel。即 java.nio.channels.SocketChannel。
也就是说 endpoint.serverSocketAccept() 获取的是一个 SocketChannel 对象。
@Override protected SocketChannel serverSocketAccept() throws Exception { return serverSock.accept(); }
serverSocketAccept 就是简单调用 serverSock.accept() 方法获取一个 SocketChannel 对象。在 nio 编程里,可以认为一个 SocketChannel 对象代表一个服务端与客户端的连接。
这个 serverSock 就是在 NioEndpoint#initServerSocket() 里调用 ServerSocketChannel.open() 初始化的。
拿到这个 SocketChannel 对象之后就配置这个对象
// Configure the socket if (endpoint.isRunning() && !endpoint.isPaused()) { // setSocketOptions() will hand the socket off to // an appropriate processor if successful if (!endpoint.setSocketOptions(socket)) { endpoint.closeSocket(socket); } } else { endpoint.destroySocket(socket); }
上面代码的逻辑很简单,就是调用 endpoint.setSocketOptions(socket) 方法,如果不成功就调用 endpoint.closeSocket(socket) 方法。destroySocket(socket) 方法内部也是调用 closeSocket 方法。
protected void destroySocket(U socket) { closeSocket(socket); }
@Override protected void closeSocket(SocketChannel socket) { countDownConnection(); try { socket.socket().close(); } catch (IOException ioe) { if (log.isDebugEnabled()) { log.debug(sm.getString("endpoint.err.close"), ioe); } } try { socket.close(); } catch (IOException ioe) { if (log.isDebugEnabled()) { log.debug(sm.getString("endpoint.err.close"), ioe); } } }
closeSocket 在 NioEndpoint 里,而 destroySocket 在 AbstractEndpoint 里。closeSocket 方法逻辑很简单就是调用 SocketChannel.socket().close() 和 SocketChannel.close() 方法。
关键地方在于 endpoint.setSocketOptions(socket) 方法。
/** * Process the specified connection. * @param socket The socket channel * @return <code>true</code> if the socket was correctly configured * and processing may continue, <code>false</code> if the socket needs to be * close immediately */ @Override protected boolean setSocketOptions(SocketChannel socket) { // Process the connection try { //disable blocking, APR style, we are gonna be polling it socket.configureBlocking(false); Socket sock = socket.socket(); socketProperties.setProperties(sock); NioChannel channel = nioChannels.pop(); if (channel == null) { SocketBufferHandler bufhandler = new SocketBufferHandler( socketProperties.getAppReadBufSize(), socketProperties.getAppWriteBufSize(), socketProperties.getDirectBuffer()); if (isSSLEnabled()) { channel = new SecureNioChannel(socket, bufhandler, selectorPool, this); } else { channel = new NioChannel(socket, bufhandler); } } else { channel.setIOChannel(socket); channel.reset(); } getPoller0().register(channel); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); try { log.error(sm.getString("endpoint.socketOptionsError"), t); } catch (Throwable tt) { ExceptionUtils.handleThrowable(tt); } // Tell to close the socket return false; } return true; }
setSocketOptions 方法里,首先用 socketProperties 给这个 SocketChannel 对象的 Socket 设置了一些属性。
然后,从 nioChannels 这个 SynchronizedStack<NioChannel> 缓存池里获取一个 NioChannel 对象,如果获取不到就创建一个,创建的 NioChannel 对象的时候也创建了一个 SocketBufferHandler 对象。
public SocketBufferHandler(int readBufferSize, int writeBufferSize, boolean direct) { this.direct = direct; if (direct) { readBuffer = ByteBuffer.allocateDirect(readBufferSize); writeBuffer = ByteBuffer.allocateDirect(writeBufferSize); } else { readBuffer = ByteBuffer.allocate(readBufferSize); writeBuffer = ByteBuffer.allocate(writeBufferSize); } }
SocketBufferHandler 对象里包含了两个 ByteBuffer 对象,一个读一个写。
protected SocketChannel sc = null; protected final SocketBufferHandler bufHandler; public NioChannel(SocketChannel channel, SocketBufferHandler bufHandler) { this.sc = channel; this.bufHandler = bufHandler; }
NioChannel 封装了对 SocketChannel 对象的读写操作。
最后 setSocketOptions 里调用了 getPoller0().register(channel)。
private Poller[] pollers = null; private AtomicInteger pollerRotater = new AtomicInteger(0); public Poller getPoller0() { int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length; return pollers[idx]; }
getPoller0() 方法就是从 pollers 数组里选一个 Poller 对象,选取的算法是轮询选取。
选出 Poller 对象后,调用其 register(channel) 方法。
/** * Registers a newly created socket with the poller. * * @param socket The newly created socket */ public void register(final NioChannel socket) { socket.setPoller(this); NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this); socket.setSocketWrapper(ka); ka.setPoller(this); ka.setReadTimeout(getConnectionTimeout()); ka.setWriteTimeout(getConnectionTimeout()); ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests()); ka.setSecure(isSSLEnabled()); PollerEvent r = eventCache.pop(); ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into. if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER); else r.reset(socket,ka,OP_REGISTER); addEvent(r); }
register 方法传入的参数是 NioChannel 而不是 SocketChannel 了,SocketChannel 已经与 NioChannel 关联了。
register 第一行就调用 NioChannel#setPoller 方法,把当前 Poller 对象复制给 NioChannel 的属性,将 NioChannel 对象与 Poller 对象关联起来。
接着 创建了一个 NioSocketWrapper 对象并设置了相关属性,其中最重要的是 ka.interestOps(SelectionKey.OP_READ) 这一行设置了 NioSocketWrapper 所感兴趣的操作。
然后把 NioChannel 对象与 NioSocketWrapper 对象关联起来。
public NioSocketWrapper(NioChannel channel, NioEndpoint endpoint) { super(channel, endpoint); pool = endpoint.getSelectorPool(); socketBufferHandler = channel.getBufHandler(); }
NioSocketWrapper 的声明为
public static class NioSocketWrapper extends SocketWrapperBase<NioChannel> {
SocketWrapperBase 的构造方法为
public SocketWrapperBase(E socket, AbstractEndpoint<E,?> endpoint) { this.socket = socket; this.endpoint = endpoint; ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); this.blockingStatusReadLock = lock.readLock(); this.blockingStatusWriteLock = lock.writeLock(); }
public static class NioSocketWrapper extends SocketWrapperBase<NioChannel>
public abstract class SocketWrapperBase<E>
SocketWrapperBase 声明里有一个泛型 E,而 NioSocketWrapper 的声明里,泛型 E 的具体类型则是 NioChannel。
register 方法的最后从 eventCache 缓存池里获取一个 PollerEvent 对象,如果获取不到就创建一个 PollerEvent 对象。
private NioChannel socket; private int interestOps; private NioSocketWrapper socketWrapper; public PollerEvent(NioChannel ch, NioSocketWrapper w, int intOps) { reset(ch, w, intOps); } public void reset(NioChannel ch, NioSocketWrapper w, int intOps) { socket = ch; interestOps = intOps; socketWrapper = w; }
创建 PollerEvent 对象时传入的参数分别是前面的 NioChannel 、NioSocketWrapper 对象,以及一个 int 类型的常量 OP_REGISTER,分别赋值给 PollerEvent 的属性,另外 PollerEvent 也实现了 Runnable 接口,这几个属性在 PollerEvent#run 方法里都有对应的作用。
拿到 PollerEvent 对象后,调用 addEvent(r) 方法把这个对象加入的队列中等待后续 Poller 线程的处理。
private final SynchronizedQueue<PollerEvent> events = new SynchronizedQueue<>(); private void addEvent(PollerEvent event) { events.offer(event); if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup(); }
events 缓存的 PollerEvent 对象,会在 Poller#run 方法里被处理。
小结
本文分析了 Acceptor 的 run 方法,也就是 Acceptor 线程做的事情。可以看出 Acceptor 线程在一个循环里一直接受客户端连接,生成 SocketChannel 对象,并把这个 SocketChannel 对象封装成 NioChannel 和 NioSocketWrapper 对象,并把这两个对象放在一个 PollerEvent 对象里,并把这个 PollerEvent 对象加入的缓存池里等待 Poller 线程的处理。