在上一节中我们描述了Tomcat的整体架构,我们知道了Tomcat分为两个大组件,一个连接器和一个容器。而我们这次要讲的 EndPoint
的组件就是属于连接器里面的。它是一个通信的端点,就是负责对外实现TCP/IP协议。 EndPoint
是个接口,它的具体实现类就是 AbstractEndpoint
,而 AbstractEndpoint
具体的实现类就有 AprEndpoint
、 Nio2Endpoint
、 NioEndpoint
AprEndpoint Nio2Endpoint NioEndpoint
我们知道 NioEndpoint
而 NioEndpoint
为了实现上面这两步,用了五个组件来。这五个组件是 LimitLatch
、 Acceptor
、 Poller
、 SocketProcessor
、 Executor
/** * Threads used to accept new connections and pass them to worker threads. */ protected List<Acceptor<U>> acceptors; /** * counter for nr of connections handled by an endpoint */ private volatile LimitLatch connectionLimitLatch = null; /** * The socket pollers. */ private Poller[] pollers = null; 内部类 SocketProcessor /** * External Executor based thread pool. */ private Executor executor = null;
:连接控制器,负责控制最大的连接数 Acceptor
:负责接收新的连接,然后返回一个 Channel
对象给 Poller
:可以将其看成是NIO中 Selector
,负责监控 Channel
的状态 SocketProcessor
:可以看成是一个被封装的任务类 Executor
:Tomcat自己扩展的线程池,用来执行任务类 用图简单表示就是以下的关系
我们上面说了 LimitLatch
主要是用来控制Tomcat所能接收的最大数量连接,如果超过了此连接,那么Tomcat就会将此连接线程阻塞等待,等里面有其他连接释放了再消费此连接。那么 LimitLatch
是如何做到呢?我们可以看 LimitLatch
public class LimitLatch { private static final Log log = LogFactory.getLog(LimitLatch.class); private class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1L; public Sync() { } @Override protected int tryAcquireShared(int ignored) { long newCount = count.incrementAndGet(); if (!released && newCount > limit) { // Limit exceeded count.decrementAndGet(); return -1; } else { return 1; } } @Override protected boolean tryReleaseShared(int arg) { count.decrementAndGet(); return true; } } private final Sync sync; //当前连接数 private final AtomicLong count; //最大连接数 private volatile long limit; private volatile boolean released = false; }
我们可以看到它内部实现了 AbstractQueuedSynchronizer
,AQS其实就是一个框架,实现它的类可以自定义控制线程什么时候挂起什么时候释放。 limit
参数就是控制的最大连接数。我们可以看到 AbstractEndpoint
调用 LimitLatch
的 countUpOrAwait
public void countUpOrAwait() throws InterruptedException { if (log.isDebugEnabled()) { log.debug("Counting up["+Thread.currentThread().getName()+"] latch="+getCount()); } sync.acquireSharedInterruptibly(1); }
AQS是如何知道什么时候阻塞线程呢?即不能获取连接呢?这些就靠用户自己实现 AbstractQueuedSynchronizer
自己来定义什么时候获取连接,什么时候释放连接了。可以看到Sync类重写了 tryAcquireShared
和 tryReleaseShared
方法。在 tryAcquireShared
方法中定义了一旦当前连接数大于了设置的最大连接数,那么就会返回 -1
是接收连接的,我们可以看到 Acceptor
实现了 Runnable
接口,那么在哪会新开启线程来执行 Acceptor
的run方法呢?在 AbstractEndpoint
的 startAcceptorThreads
protected void startAcceptorThreads() { int count = getAcceptorThreadCount(); acceptors = new ArrayList<>(count); for (int i = 0; i < count; i++) { Acceptor<U> acceptor = new Acceptor<>(this); String threadName = getName() + "-Acceptor-" + i; acceptor.setThreadName(threadName); acceptors.add(acceptor); Thread t = new Thread(acceptor, threadName); t.setPriority(getAcceptorThreadPriority()); t.setDaemon(getDaemon()); t.start(); } }
可以看到这里可以设置开启几个 Acceptor
,默认是一个。而一个端口只能对应一个 ServerSocketChannel
,那么这个 ServerSocketChannel
在哪初始化呢?我们可以看到在 Acceptor<U> acceptor = new Acceptor<>(this);
这句话中传入了this进去,那么应该是由 Endpoint
组件初始化的连接。在 NioEndpoint
的 initServerSocket
// Separated out to make it easier for folks that extend NioEndpoint to // implement custom [server]sockets protected void initServerSocket() throws Exception { if (!getUseInheritedChannel()) { serverSock = ServerSocketChannel.open(); socketProperties.setProperties(serverSock.socket()); InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset()); serverSock.socket().bind(addr,getAcceptCount()); } else { // Retrieve the channel provided by the OS Channel ic = System.inheritedChannel(); if (ic instanceof ServerSocketChannel) { serverSock = (ServerSocketChannel) ic; } if (serverSock == null) { throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited")); } } serverSock.configureBlocking(true); //mimic APR behavior }
被设置成了阻塞的模式,也就是说是以阻塞方式接受连接的。或许会有疑问。在平时的NIO编程中Channel不是都要设置成非阻塞模式吗?这里解释一下,如果是设置成非阻塞模式那么就必须设置一个 Selector
这里需要注意一点,每个 Acceptor
在生成 PollerEvent
对象放入 Poller
队列中时都是随机取出 Poller
对象的,具体代码可以看如下,所以 Poller
中的 Queue
对象设置成了 SynchronizedQueue<PollerEvent>
,因为可能有多个 Acceptor
同时向此 Poller
的队列中放入 PollerEvent
public Poller getPoller0() { if (pollerThreadCount == 1) { return pollers[0]; } else { int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length; return pollers[idx]; } }
所有的 Acceptor
共用这一个连接,在 Acceptor
的 run
@Override public void run() { // Loop until we receive a shutdown command while (endpoint.isRunning()) { try { //如果到了最大连接数,线程等待 endpoint.countUpOrAwaitConnection(); U socket = null; try { //调用accept方法获得一个连接 socket = endpoint.serverSocketAccept(); } catch (Exception ioe) { // 出异常以后当前连接数减掉1 endpoint.countDownConnection(); } // 配置Socket if (endpoint.isRunning() && !endpoint.isPaused()) { // setSocketOptions() will hand the socket off to // an appropriate processor if successful if (!endpoint.setSocketOptions(socket)) { endpoint.closeSocket(socket); } } else { endpoint.destroySocket(socket); } }
LimitLatch endpoint.setSocketOptions(socket)
protected boolean setSocketOptions(SocketChannel socket) { // Process the connection try { // 设置Socket为非阻塞模式,供Poller调用 socket.configureBlocking(false); Socket sock = socket.socket(); socketProperties.setProperties(sock); NioChannel channel = null; if (nioChannels != null) { channel = nioChannels.pop(); } if (channel == null) { SocketBufferHandler bufhandler = new SocketBufferHandler( socketProperties.getAppReadBufSize(), socketProperties.getAppWriteBufSize(), socketProperties.getDirectBuffer()); if (isSSLEnabled()) { channel = new SecureNioChannel(socket, bufhandler, selectorPool, this); } else { channel = new NioChannel(socket, bufhandler); } } else { channel.setIOChannel(socket); channel.reset(); } //注册ChannelEvent,其实是将ChannelEvent放入到队列中,然后Poller从队列中取 getPoller0().register(channel); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); try { log.error(sm.getString("endpoint.socketOptionsError"), t); } catch (Throwable tt) { ExceptionUtils.handleThrowable(tt); } // Tell to close the socket return false; } return true; }
其实里面重要的就是将 Acceptor
与一个 Poller
绑定起来,然后两个组件通过队列通信,每个Poller都维护着一个 SynchronizedQueue
队列, ChannelEvent
放入到队列中,然后 Poller
我们可以看到 Poller
是 NioEndpoint
的内部类,而它也是实现了 Runnable
接口,可以看到在其类中维护了一个Quene和Selector,定义如下。所以本质上 Poller
就是 Selector
private Selector selector; private final SynchronizedQueue<PollerEvent> events = new SynchronizedQueue<>();
@Override public void run() { // Loop until destroy() is called while (true) { boolean hasEvents = false; try { if (!close) { //查看是否有连接进来,如果有就将Channel注册进Selector中 hasEvents = events(); } if (close) { events(); timeout(0, false); try { selector.close(); } catch (IOException ioe) { log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe); } break; } } catch (Throwable x) { ExceptionUtils.handleThrowable(x); log.error(sm.getString("endpoint.nio.selectorLoopError"), x); continue; } if (keyCount == 0) { hasEvents = (hasEvents | events()); } Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null; // Walk through the collection of ready keys and dispatch // any active event. while (iterator != null && iterator.hasNext()) { SelectionKey sk = iterator.next(); NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment(); // Attachment may be null if another thread has called // cancelledKey() if (socketWrapper == null) { iterator.remove(); } else { iterator.remove(); processKey(sk, socketWrapper); } } // Process timeouts timeout(keyCount,hasEvents); } getStopLatch().countDown(); }
其中主要的就是调用了 events()
方法,就是不断的查看队列中是否有 Pollerevent
事件,如果有的话就将其取出然后把里面的 Channel
取出来注册到该 Selector
中,然后不断轮询所有注册过的 Channel
我们知道 Poller
在轮询 Channel
有事件发生时,就会调用将此事件封装起来,然后交给线程池去执行。那么这个包装类就是 SocketProcessor
。而我们打开此类,能够看到它也实现了 Runnable
接口,用来定义线程池 Executor
中线程所执行的任务。那么这里是如何将 Channel
中的字节流转换为Tomcat需要的 ServletRequest
对象呢?其实就是调用了 Http11Processor
public interface Executor extends java.util.concurrent.Executor, Lifecycle
但是在Tomcat自定义的线程池中是不一样的,通过重写了 execute
差别就在于第四步的差别,原生线程池的处理策略是只要当前线程数大于最大线程数,那么就抛异常,而Tomcat的则是如果当前线程数大于最大线程数,就再尝试一次,如果还是满的才会抛异常。下面是定制化线程池 execute
public void execute(Runnable command, long timeout, TimeUnit unit) { submittedCount.incrementAndGet(); try { super.execute(command); } catch (RejectedExecutionException rx) { if (super.getQueue() instanceof TaskQueue) { //获得任务队列 final TaskQueue queue = (TaskQueue)super.getQueue(); try { if (!queue.force(command, timeout, unit)) { submittedCount.decrementAndGet(); throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull")); } } catch (InterruptedException x) { submittedCount.decrementAndGet(); throw new RejectedExecutionException(x); } } else { submittedCount.decrementAndGet(); throw rx; } } }
在代码中,我们可以看到有这么一句 submittedCount.incrementAndGet();
/** * The number of tasks submitted but not yet finished. This includes tasks * in the queue and tasks that have been handed to a worker thread but the * latter did not start executing the task yet. * This number is always greater or equal to {@link #getActiveCount()}. */ private final AtomicInteger submittedCount = new AtomicInteger(0);
为什么会有这么一个参数呢?我们知道定制的队列是继承了 LinkedBlockingQueue
,而 LinkedBlockingQueue
队列默认是没有边界的。于是我们就传入了一个参数, maxQueueSize
那么如何规定任务队列是否已满呢?如果设置了队列的最大长度当然好了,但是Tomcat默认情况下是没有设置,所以默认是无限的。所以Tomcat的 TaskQueue
继承了 LinkedBlockingQueue
,重写了 offer
@Override public boolean offer(Runnable o) { if (parent==null) return super.offer(o); //如果当前线程数等于最大线程数,此时不能创建新线程,只能添加进任务队列中 if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o); //如果已提交但是未完成的任务数小于等于当前线程数,说明能处理过来,就放入队列中 if (parent.getSubmittedCount()<=(parent.getPoolSize())) return super.offer(o); //到这一步说明,已提交但是未完成的任务数大于当前线程数,如果当前线程数小于最大线程数,就返回false新建线程 if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false; return super.offer(o); }
这就是 submittedCount