在网络通信的时候,一般来说客户端和服务端只需要建立一个连接就够了,但是在某些场景下我们需要建立多个连接。比如使用了负载均衡,如果只建立一个连接,可能会出现负载不均衡的场景,有时候我们为了增加客户端的吞吐量也需要建立连接池。
创建连接池的最大难点就在于如何保证在高并发的情况下,能够创建我们指定的连接数,以及如何做好连接池的管理,比如连接池无可用连接怎么办?连接假死后如何为连接池补充新的连接。Netty为我们提供了两个连接池实现这些功能。SimpleChannelPool封装了连接池的基本功能,但是它不能指定连接池的连接数,所以不能被应用到生产。FixedChannelPool是功能更加强大的连接池,它扩展了SimpleChannelPool可以被应用到生产中。
public class ClientMock { private static SimpleChannelPoolMap poolMap; public static void main(String[] args) { NioEventLoopGroup group = new NioEventLoopGroup(1, new DefaultThreadFactory("Client-Event", false)); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, Boolean.TRUE) .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE) .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); poolMap = new SimpleChannelPoolMap(bootstrap); SimpleChannelPool channelPool = poolMap.get(new InetSocketAddress(8090)); // 从连接池获取一个连接 channelPool.acquire().addListener(new FutureListener<Channel>() { @Override public void operationComplete(Future<Channel> future) throws Exception { if (future.isSuccess()) { Channel channel = future.getNow(); channel.writeAndFlush(Unpooled.copiedBuffer("hello", CharsetUtil.UTF_8)); // 将连接放入连接池 channelPool.release(channel); } if (future.cause() != null) { System.out.println(future.cause()); } } }); } } 复制代码
public class SimpleChannelPoolMap extends AbstractChannelPoolMap<InetSocketAddress, SimpleChannelPool> { private Bootstrap bootstrap; SimpleHandler simpleHandler = new SimpleHandler(); public SimpleChannelPoolMap(Bootstrap bootstrap) { this.bootstrap = bootstrap; } @Override protected SimpleChannelPool newPool(InetSocketAddress key) { return new SimpleChannelPool(bootstrap.remoteAddress(key), new ChannelPoolHandler() { @Override public void channelReleased(Channel ch) throws Exception { System.out.println("channelReleased: " + ch); } @Override public void channelAcquired(Channel ch) throws Exception { System.out.println("channelAcquired: " + ch); } @Override public void channelCreated(Channel ch) throws Exception { // 为channel添加handler ch.pipeline().addLast(simpleHandler); } }); } } 复制代码
很简单就能构建出一个netty的连接池。
1. AbstractChannelPoolMap
在get连接池的时候通过ConcurrentHashMap的 putIfAbsent
保证只创建一个连接池。
private final ConcurrentMap<K, P> map = PlatformDependent.newConcurrentHashMap(); @Override public final P get(K key) { P pool = map.get(checkNotNull(key, "key")); if (pool == null) { // 创建连接池 pool = newPool(key); // 如果已经创建了连接池,那么就把新的关闭,然后返回老的 P old = map.putIfAbsent(key, pool); if (old != null) { // We need to destroy the newly created pool as we not use it. poolCloseAsyncIfSupported(pool); pool = old; } } return pool; } 复制代码
public class SimpleChannelPool implements ChannelPool { private static final AttributeKey<SimpleChannelPool> POOL_KEY = AttributeKey.newInstance("io.netty.channel.pool.SimpleChannelPool"); private final Deque<Channel> deque = PlatformDependent.newConcurrentDeque(); public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck, boolean releaseHealthCheck, boolean lastRecentUsed) { this.handler = checkNotNull(handler, "handler"); this.healthCheck = checkNotNull(healthCheck, "healthCheck"); this.releaseHealthCheck = releaseHealthCheck; // Clone the original Bootstrap as we want to set our own handler this.bootstrap = checkNotNull(bootstrap, "bootstrap").clone(); this.bootstrap.handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { assert ch.eventLoop().inEventLoop(); handler.channelCreated(ch); } }); this.lastRecentUsed = lastRecentUsed; } @Override public final Future<Channel> acquire() { return acquire(bootstrap.config().group().next().<Channel>newPromise()); } @Override public Future<Channel> acquire(final Promise<Channel> promise) { return acquireHealthyFromPoolOrNew(checkNotNull(promise, "promise")); } 复制代码
releaseHealthCheck lastRecentUsed
创建连接池的时候会调用ChannelPoolHandler#channelCreated方法做对channel的初始化操作。
这里的断言真的是非常细节,调用 initChannel
方法的时候,channel的EventLoop已经初始化了,所以这里进行了一次断言。
接下来看下真正获取连接方法 acquire
@Override public final Future<Channel> acquire() { // 获取线程选择器,并创建一个Promise return acquire(bootstrap.config().group().next().<Channel>newPromise()); } @Override public Future<Channel> acquire(final Promise<Channel> promise) { return acquireHealthyFromPoolOrNew(checkNotNull(promise, "promise")); } private Future<Channel> acquireHealthyFromPoolOrNew(final Promise<Channel> promise) { try { final Channel ch = pollChannel(); if (ch == null) { // No Channel left in the pool bootstrap a new Channel Bootstrap bs = bootstrap.clone(); bs.attr(POOL_KEY, this); //创建连接 ChannelFuture f = connectChannel(bs); if (f.isDone()) { notifyConnect(f, promise); } else { f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { notifyConnect(future, promise); } }); } return promise; } EventLoop loop = ch.eventLoop(); if (loop.inEventLoop()) { doHealthCheck(ch, promise); } else { loop.execute(new Runnable() { @Override public void run() { doHealthCheck(ch, promise); } }); } } catch (Throwable cause) { promise.tryFailure(cause); } return promise; } 复制代码
获取连接的方法是一个完全的异步编程,如果你不理解Netty的源码和EevntLoop的原理这里确实很难理解。
public void promiseTest() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); EventExecutor executorA = new DefaultEventExecutor(new DefaultThreadFactory("EventA")); EventExecutor executorB = new DefaultEventExecutor(); Channel channel = new NioSocketChannel(); // 为EventLoop注册一个Promise Promise<Channel> newPromise = executorA.<Channel>newPromise(); System.out.println(Thread.currentThread().getName()); newPromise.addListener(f -> { if (f.isSuccess()) { Assert.assertEquals(channel, f.getNow()); System.out.println(Thread.currentThread().getName()); latch.countDown(); } }); Assert.assertEquals(false, executorB.inEventLoop()); executorB.execute(new Runnable() { @Override public void run() { newPromise.setSuccess(channel); } }); latch.await(); } 复制代码
这段代码可以帮助理解Netty的异步编程方式。然后我们再看SimpleChannelPool的acquire方法。
首先通过Netty客户端BootStrap的EventLoop线程选择器获取一个EventLoop,并创建了一个Promise。然后这个EventLoop将用来执行Channel的连接操作,当channel获取成功或者失败的时候都会通知到这个Promise。
然后根据 lastRecentUsed
的值,判断是使用LIFO还是FIFO的方式获取一个连接。
protected Channel pollChannel() { return lastRecentUsed ? deque.pollLast() : deque.pollFirst(); } 复制代码
接下来如果没有获取到连接,就会执行建立连接的方法,如果从连接池获取到连接,会对这个连接进行健康检查。
ChannelFuture f = connectChannel(bs); 复制代码
connectChannel
是真正建立连接的方法,这个方法和调用BootStrap#connect()走的是一样的逻辑,主要就创建channel并为这个channel绑定EventLoop,并把建立连接的操作提交到EventLoop的taskQueue。
private void notifyConnect(ChannelFuture future, Promise<Channel> promise) throws Exception { //执行成功 if (future.isSuccess()) { Channel channel = future.channel(); handler.channelAcquired(channel); // 回写结果 if (!promise.trySuccess(channel)) { // Promise was completed in the meantime (like cancelled), just release the channel again release(channel); } } else { promise.tryFailure(future.cause()); } } 复制代码
如果连接建立成功,通过future获取到channel,并执行ChannelPoolHandler#channelAcquired的方法,并调用Promise的trySuccess方法,尝试把channel设置到promise的结果。
:warning:这里有个非常重要的信息,就是acquire方法,在建立连接并写入结果到promise后并没有把连接放到连接池。而是写入promise失败才会把这个连接放到连接池里。
如果 pollChannel()
获取连接不为空,则会对这个连接进行健康检查。
如果这个连接处于活跃状态,那么执行ChannelPoolHandler#channelAcquired,并把channel写入到promise。
如果这个连接处于非活跃状态,则会关闭这个连接。并重新执行 acquireHealthyFromPoolOrNew
方法,从连接池中获取一个新的连接,
@Override public Future<Void> release(final Channel channel, final Promise<Void> promise) { checkNotNull(channel, "channel"); checkNotNull(promise, "promise"); try { EventLoop loop = channel.eventLoop(); // 判断是不是eventloop线程 if (loop.inEventLoop()) { doReleaseChannel(channel, promise); } else { loop.execute(new Runnable() { @Override public void run() { doReleaseChannel(channel, promise); } }); } } catch (Throwable cause) { closeAndFail(channel, cause, promise); } return promise; } private void doReleaseChannel(Channel channel, Promise<Void> promise) { assert channel.eventLoop().inEventLoop(); // Remove the POOL_KEY attribute from the Channel and check if it was acquired from this pool, if not fail. if (channel.attr(POOL_KEY).getAndSet(null) != this) { closeAndFail(channel, // Better include a stacktrace here as this is an user error. new IllegalArgumentException( "Channel " + channel + " was not acquired from this ChannelPool"), promise); } else { try { if (releaseHealthCheck) { doHealthCheckOnRelease(channel, promise); } else { releaseAndOffer(channel, promise); } } catch (Throwable cause) { closeAndFail(channel, cause, promise); } } } private void doHealthCheckOnRelease(final Channel channel, final Promise<Void> promise) throws Exception { final Future<Boolean> f = healthCheck.isHealthy(channel); if (f.isDone()) { releaseAndOfferIfHealthy(channel, promise, f); } else { f.addListener(new FutureListener<Boolean>() { @Override public void operationComplete(Future<Boolean> future) throws Exception { releaseAndOfferIfHealthy(channel, promise, f); } }); } } 复制代码
release方法先判断当前线程是不是channel的eventloop线程,如果不是则把放入连接池的任务提交到eventloop线程执行。
在将连接放入连接池的时候会根据 releaseHealthCheck
判断,是否对连接进行健康检查。
SimpleChannelPool实现了连接池的基本功能,但是他不能支持限制连接池的连接数,所以在生产环境我们需要使用 FixedChannelPool
public FixedChannelPool(Bootstrap bootstrap, ChannelPoolHandler handler, ChannelHealthChecker healthCheck, AcquireTimeoutAction action, final long acquireTimeoutMillis, int maxConnections, int maxPendingAcquires, boolean releaseHealthCheck, boolean lastRecentUsed) { super(bootstrap, handler, healthCheck, releaseHealthCheck, lastRecentUsed); if (maxConnections < 1) { throw new IllegalArgumentException("maxConnections: " + maxConnections + " (expected: >= 1)"); } if (maxPendingAcquires < 1) { throw new IllegalArgumentException("maxPendingAcquires: " + maxPendingAcquires + " (expected: >= 1)"); } if (action == null && acquireTimeoutMillis == -1) { timeoutTask = null; acquireTimeoutNanos = -1; } else if (action == null && acquireTimeoutMillis != -1) { throw new NullPointerException("action"); } else if (action != null && acquireTimeoutMillis < 0) { throw new IllegalArgumentException("acquireTimeoutMillis: " + acquireTimeoutMillis + " (expected: >= 0)"); } else { acquireTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(acquireTimeoutMillis); switch (action) { case FAIL: timeoutTask = new TimeoutTask() { @Override public void onTimeout(AcquireTask task) { // Fail the promise as we timed out. task.promise.setFailure(new TimeoutException( "Acquire operation took longer then configured maximum time") { @Override public Throwable fillInStackTrace() { return this; } }); } }; break; case NEW: timeoutTask = new TimeoutTask() { @Override public void onTimeout(AcquireTask task) { // Increment the acquire count and delegate to super to actually acquire a Channel which will // create a new connection. task.acquired(); FixedChannelPool.super.acquire(task.promise); } }; break; default: throw new Error(); } } executor = bootstrap.config().group().next(); this.maxConnections = maxConnections; this.maxPendingAcquires = maxPendingAcquires; } 复制代码
maxConnections
连接池中的最大连接数。 acquireTimeoutNanos
等待连接池连接的最大时间,单位毫秒。 maxPendingAcquires
在请求获取/建立连接大于maxConnections数时,创建等待建立连接的最大定时任务数量。例如maxConnections=2,此时已经建立了2连接,但是没有放入到连接池中,接下来的请求就会放入到一个后台执行的定时任务中,如果到了时间连接池中还没有连接,就可以建立不大于 maxPendingAcquires
的连接数,如果连接池中有连接了就从连接池中获取。 executor
用于执行获取连接和释放连接的EventLoop。 TimeoutTask.FAIL
:如果连接池中没有可用连接了,等待 acquireTimeoutNanos
后,抛出一个超时异常。 TimeoutTask.NEW
:如果连接池中没有可用连接了,等待 acquireTimeoutNanos
后,创建一个新的连接。 @Override public Future<Channel> acquire(final Promise<Channel> promise) { // 使用同一个executor保证线程安全 try { if (executor.inEventLoop()) { acquire0(promise); } else { executor.execute(new Runnable() { @Override public void run() { acquire0(promise); } }); } } catch (Throwable cause) { promise.setFailure(cause); } return promise; } private void acquire0(final Promise<Channel> promise) { assert executor.inEventLoop(); if (closed) { promise.setFailure(new IllegalStateException("FixedChannelPool was closed")); return; } if (acquiredChannelCount.get() < maxConnections) { assert acquiredChannelCount.get() >= 0; // We need to create a new promise as we need to ensure the AcquireListener runs in the correct // EventLoop // 创建一个新的Promise Promise<Channel> p = executor.newPromise(); AcquireListener l = new AcquireListener(promise); l.acquired(); p.addListener(l); super.acquire(p); } else { if (pendingAcquireCount >= maxPendingAcquires) { tooManyOutstanding(promise); } else { AcquireTask task = new AcquireTask(promise); if (pendingAcquireQueue.offer(task)) { ++pendingAcquireCount; if (timeoutTask != null) { task.timeoutFuture = executor.schedule(timeoutTask, acquireTimeoutNanos, TimeUnit.NANOSECONDS); } } else { tooManyOutstanding(promise); } } assert pendingAcquireCount > 0; } } 复制代码
FixedChannelPool重写了SimpleChannelPool的 acquire(final Promise<Channel> promise)
方法,把所有的获取连接的任务 acquire0
交给一个EventLoop执行,让获取我而不是像SimpleChannelPool中每获取一个连接都使用线程选择器选择一个EventLoop,用此来保证在高并发的情况下 acquiredChannelCountd < maxConnections
的安全性,可以创建预期的连接数。所以在 acquire0
中需要为FixedChannelPool中的EventLoop创建一个新的promise,然后调用SimpleChannelPool的 acquire(final Promise<Channel> promise)
方法用来建立一个新的连接或者从连接池中获取一个连接。
每创建一个连接并且这个连接没有放入到连接池时 acquiredChannelCount
都会增加加1,用来保证不创建超过 maxConnections
的连接数。 acquiredChannelCountd > maxConnections
的时候,FixedChannelPool会根据 pendingAcquireCount
的值来判断是否创建一个定时的任务,去建立新的连接。
private abstract class TimeoutTask implements Runnable { @Override public final void run() { assert executor.inEventLoop(); long nanoTime = System.nanoTime(); for (;;) { AcquireTask task = pendingAcquireQueue.peek(); // 检查是否到了执行时间 if (task == null || nanoTime - task.expireNanoTime < 0) { break; } pendingAcquireQueue.remove(); --pendingAcquireCount; onTimeout(task); } } public abstract void onTimeout(AcquireTask task); } 复制代码
释放连接的时候同样使用FixedChannelPool中的EventLoop保证高并发下的线程安全问题,在释放连接的时候会执行 decrementAndRunTaskQueue()
方法,成功后会尝试终止定时任务,从连接池中返回连接到promise。
private void decrementAndRunTaskQueue() { // We should never have a negative value. int currentCount = acquiredChannelCount.decrementAndGet(); assert currentCount >= 0; runTaskQueue(); } private void runTaskQueue() { while (acquiredChannelCount.get() < maxConnections) { AcquireTask task = pendingAcquireQueue.poll(); if (task == null) { break; } // Cancel the timeout if one was scheduled ScheduledFuture<?> timeoutFuture = task.timeoutFuture; if (timeoutFuture != null) { timeoutFuture.cancel(false); } --pendingAcquireCount; task.acquired(); super.acquire(task.promise); } // We should never have a negative value. assert pendingAcquireCount >= 0; assert acquiredChannelCount.get() >= 0; } 复制代码