转载

深入理解Netty连接池SimpleChannelPool和FixedChannelPool

在网络通信的时候,一般来说客户端和服务端只需要建立一个连接就够了,但是在某些场景下我们需要建立多个连接。比如使用了负载均衡,如果只建立一个连接,可能会出现负载不均衡的场景,有时候我们为了增加客户端的吞吐量也需要建立连接池。

创建连接池的最大难点就在于如何保证在高并发的情况下,能够创建我们指定的连接数,以及如何做好连接池的管理,比如连接池无可用连接怎么办?连接假死后如何为连接池补充新的连接。Netty为我们提供了两个连接池实现这些功能。SimpleChannelPool封装了连接池的基本功能,但是它不能指定连接池的连接数,所以不能被应用到生产。FixedChannelPool是功能更加强大的连接池,它扩展了SimpleChannelPool可以被应用到生产中。

Netty的连接池最简单的使用姿势

public class ClientMock {
    private static SimpleChannelPoolMap poolMap;

    public static void main(String[] args) {
        NioEventLoopGroup group = new NioEventLoopGroup(1, new DefaultThreadFactory("Client-Event", false));
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        poolMap = new SimpleChannelPoolMap(bootstrap);

        SimpleChannelPool channelPool = poolMap.get(new InetSocketAddress(8090));
        // 从连接池获取一个连接
        channelPool.acquire().addListener(new FutureListener<Channel>() {
            @Override
            public void operationComplete(Future<Channel> future) throws Exception {
                if (future.isSuccess()) {
                    Channel channel = future.getNow();
                    channel.writeAndFlush(Unpooled.copiedBuffer("hello", CharsetUtil.UTF_8));
                    // 将连接放入连接池
                    channelPool.release(channel);
                }
                if (future.cause() != null) {
                    System.out.println(future.cause());
                }
            }
        });
    }
}
复制代码
public class SimpleChannelPoolMap extends AbstractChannelPoolMap<InetSocketAddress, SimpleChannelPool> {
    private Bootstrap bootstrap;
    SimpleHandler simpleHandler = new SimpleHandler();

    public SimpleChannelPoolMap(Bootstrap bootstrap) {
        this.bootstrap = bootstrap;
    }

    @Override
    protected SimpleChannelPool newPool(InetSocketAddress key) {
        return new SimpleChannelPool(bootstrap.remoteAddress(key), new ChannelPoolHandler() {
            @Override
            public void channelReleased(Channel ch) throws Exception {
                System.out.println("channelReleased: " + ch);
            }

            @Override
            public void channelAcquired(Channel ch) throws Exception {
                System.out.println("channelAcquired: " + ch);

            }
            @Override
            public void channelCreated(Channel ch) throws Exception {
                // 为channel添加handler
                ch.pipeline().addLast(simpleHandler);
            }
        });
    }
}

复制代码

很简单就能构建出一个netty的连接池。

SimpleChannelPool

问题一:如何保证一个key只创建一个连接池?

1. AbstractChannelPoolMap 在get连接池的时候通过ConcurrentHashMap的 putIfAbsent 保证只创建一个连接池。

private final ConcurrentMap<K, P> map = PlatformDependent.newConcurrentHashMap();

@Override
    public final P get(K key) {
        P pool = map.get(checkNotNull(key, "key"));
        if (pool == null) {
            // 创建连接池
            pool = newPool(key);
            // 如果已经创建了连接池,那么就把新的关闭,然后返回老的
            P old = map.putIfAbsent(key, pool);
            if (old != null) {
                // We need to destroy the newly created pool as we not use it.
                poolCloseAsyncIfSupported(pool);
                pool = old;
            }
        }
        return pool;
    }
复制代码

问题二:如何创建连接?

SimpleChannelPool#acquire()

public class SimpleChannelPool implements ChannelPool {
    private static final AttributeKey<SimpleChannelPool> POOL_KEY =
        AttributeKey.newInstance("io.netty.channel.pool.SimpleChannelPool");
    private final Deque<Channel> deque = PlatformDependent.newConcurrentDeque();
    
    public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck,
                             boolean releaseHealthCheck, boolean lastRecentUsed) {
        this.handler = checkNotNull(handler, "handler");
        this.healthCheck = checkNotNull(healthCheck, "healthCheck");
        this.releaseHealthCheck = releaseHealthCheck;
        // Clone the original Bootstrap as we want to set our own handler
        this.bootstrap = checkNotNull(bootstrap, "bootstrap").clone();
        this.bootstrap.handler(new ChannelInitializer<Channel>() {
            @Override
            protected void initChannel(Channel ch) throws Exception {
                assert ch.eventLoop().inEventLoop();
                handler.channelCreated(ch);
            }
        });
        this.lastRecentUsed = lastRecentUsed;
    }
    
    @Override
    public final Future<Channel> acquire() {
        return acquire(bootstrap.config().group().next().<Channel>newPromise());
    }

    @Override
    public Future<Channel> acquire(final Promise<Channel> promise) {
        return acquireHealthyFromPoolOrNew(checkNotNull(promise, "promise"));
    }    
复制代码
releaseHealthCheck
lastRecentUsed

创建连接池的时候会调用ChannelPoolHandler#channelCreated方法做对channel的初始化操作。

这里的断言真的是非常细节,调用 initChannel 方法的时候,channel的EventLoop已经初始化了,所以这里进行了一次断言。

接下来看下真正获取连接方法 acquire

@Override
    public final Future<Channel> acquire() {
        // 获取线程选择器,并创建一个Promise
        return acquire(bootstrap.config().group().next().<Channel>newPromise());
    }

    @Override
    public Future<Channel> acquire(final Promise<Channel> promise) {
        return acquireHealthyFromPoolOrNew(checkNotNull(promise, "promise"));
    }

    private Future<Channel> acquireHealthyFromPoolOrNew(final Promise<Channel> promise) {
        try {
            final Channel ch = pollChannel();
            if (ch == null) {
                // No Channel left in the pool bootstrap a new Channel
                Bootstrap bs = bootstrap.clone();
                bs.attr(POOL_KEY, this);
                //创建连接
                ChannelFuture f = connectChannel(bs);
                if (f.isDone()) {
                    notifyConnect(f, promise);
                } else {
                    f.addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            notifyConnect(future, promise);
                        }
                    });
                }
                return promise;
            }
            EventLoop loop = ch.eventLoop();
            if (loop.inEventLoop()) {
                doHealthCheck(ch, promise);
            } else {
                loop.execute(new Runnable() {
                    @Override
                    public void run() {
                        doHealthCheck(ch, promise);
                    }
                });
            }
        } catch (Throwable cause) {
            promise.tryFailure(cause);
        }
        return promise;
    }
复制代码

获取连接的方法是一个完全的异步编程,如果你不理解Netty的源码和EevntLoop的原理这里确实很难理解。

public void promiseTest() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        EventExecutor executorA = new DefaultEventExecutor(new DefaultThreadFactory("EventA"));
        EventExecutor executorB = new DefaultEventExecutor();
        Channel channel = new NioSocketChannel();
        // 为EventLoop注册一个Promise
        Promise<Channel> newPromise = executorA.<Channel>newPromise();
        System.out.println(Thread.currentThread().getName());
        newPromise.addListener(f -> {
            if (f.isSuccess()) {
                Assert.assertEquals(channel, f.getNow());
                System.out.println(Thread.currentThread().getName());
                latch.countDown();
            }
        });
        Assert.assertEquals(false, executorB.inEventLoop());
        executorB.execute(new Runnable() {
            @Override
            public void run() {
                newPromise.setSuccess(channel);
            }
        });
        latch.await();
    }
复制代码

这段代码可以帮助理解Netty的异步编程方式。然后我们再看SimpleChannelPool的acquire方法。

首先通过Netty客户端BootStrap的EventLoop线程选择器获取一个EventLoop,并创建了一个Promise。然后这个EventLoop将用来执行Channel的连接操作,当channel获取成功或者失败的时候都会通知到这个Promise。

然后根据 lastRecentUsed 的值,判断是使用LIFO还是FIFO的方式获取一个连接。

protected Channel pollChannel() {
        return lastRecentUsed ? deque.pollLast() : deque.pollFirst();
    }
复制代码

接下来如果没有获取到连接,就会执行建立连接的方法,如果从连接池获取到连接,会对这个连接进行健康检查。

ChannelFuture f = connectChannel(bs);
复制代码

connectChannel 是真正建立连接的方法,这个方法和调用BootStrap#connect()走的是一样的逻辑,主要就创建channel并为这个channel绑定EventLoop,并把建立连接的操作提交到EventLoop的taskQueue。

private void notifyConnect(ChannelFuture future, Promise<Channel> promise) throws Exception {
        //执行成功
        if (future.isSuccess()) {
            Channel channel = future.channel();
            handler.channelAcquired(channel);
            // 回写结果
            if (!promise.trySuccess(channel)) {
                // Promise was completed in the meantime (like cancelled), just release the channel again
                release(channel);
            }
        } else {
            promise.tryFailure(future.cause());
        }
    }
复制代码

如果连接建立成功,通过future获取到channel,并执行ChannelPoolHandler#channelAcquired的方法,并调用Promise的trySuccess方法,尝试把channel设置到promise的结果。

:warning:这里有个非常重要的信息,就是acquire方法,在建立连接并写入结果到promise后并没有把连接放到连接池。而是写入promise失败才会把这个连接放到连接池里。

如果 pollChannel() 获取连接不为空,则会对这个连接进行健康检查。

如果这个连接处于活跃状态,那么执行ChannelPoolHandler#channelAcquired,并把channel写入到promise。

如果这个连接处于非活跃状态,则会关闭这个连接。并重新执行 acquireHealthyFromPoolOrNew 方法,从连接池中获取一个新的连接,

SimpleChannelPool#relesae()

@Override
    public Future<Void> release(final Channel channel, final Promise<Void> promise) {
        checkNotNull(channel, "channel");
        checkNotNull(promise, "promise");
        try {
            EventLoop loop = channel.eventLoop();
            // 判断是不是eventloop线程
            if (loop.inEventLoop()) {
                doReleaseChannel(channel, promise);
            } else {
                loop.execute(new Runnable() {
                    @Override
                    public void run() {
                        doReleaseChannel(channel, promise);
                    }
                });
            }
        } catch (Throwable cause) {
            closeAndFail(channel, cause, promise);
        }
        return promise;
    }

    private void doReleaseChannel(Channel channel, Promise<Void> promise) {
        assert channel.eventLoop().inEventLoop();
        // Remove the POOL_KEY attribute from the Channel and check if it was acquired from this pool, if not fail.
        if (channel.attr(POOL_KEY).getAndSet(null) != this) {
            closeAndFail(channel,
                         // Better include a stacktrace here as this is an user error.
                         new IllegalArgumentException(
                                 "Channel " + channel + " was not acquired from this ChannelPool"),
                         promise);
        } else {
            try {
                if (releaseHealthCheck) {
                    doHealthCheckOnRelease(channel, promise);
                } else {
                    releaseAndOffer(channel, promise);
                }
            } catch (Throwable cause) {
                closeAndFail(channel, cause, promise);
            }
        }
    }

    private void doHealthCheckOnRelease(final Channel channel, final Promise<Void> promise) throws Exception {
        final Future<Boolean> f = healthCheck.isHealthy(channel);
        if (f.isDone()) {
            releaseAndOfferIfHealthy(channel, promise, f);
        } else {
            f.addListener(new FutureListener<Boolean>() {
                @Override
                public void operationComplete(Future<Boolean> future) throws Exception {
                    releaseAndOfferIfHealthy(channel, promise, f);
                }
            });
        }
    }
复制代码

release方法先判断当前线程是不是channel的eventloop线程,如果不是则把放入连接池的任务提交到eventloop线程执行。

在将连接放入连接池的时候会根据 releaseHealthCheck 判断,是否对连接进行健康检查。

  • 健康检查开启:连接处于活跃状态那么就把连接放到连接池里,如果放入成功则执行ChannelPoolHandler#channelRelease方法,否则的话就会把这个连接关闭,并把结果通知Promise。如果连接没有处于活跃状态,只执行ChannelPoolHandler#channelRelease,并把结果通知Promise。
  • 健康检查关闭:直接将连接放到连接池,如果失败,则关闭这个连接。

SimpleChannelPool实现了连接池的基本功能,但是他不能支持限制连接池的连接数,所以在生产环境我们需要使用 FixedChannelPool

FixedChannelPool

public FixedChannelPool(Bootstrap bootstrap,
                            ChannelPoolHandler handler,
                            ChannelHealthChecker healthCheck, AcquireTimeoutAction action,
                            final long acquireTimeoutMillis,
                            int maxConnections, int maxPendingAcquires,
                            boolean releaseHealthCheck, boolean lastRecentUsed) {
        super(bootstrap, handler, healthCheck, releaseHealthCheck, lastRecentUsed);
        if (maxConnections < 1) {
            throw new IllegalArgumentException("maxConnections: " + maxConnections + " (expected: >= 1)");
        }
        if (maxPendingAcquires < 1) {
            throw new IllegalArgumentException("maxPendingAcquires: " + maxPendingAcquires + " (expected: >= 1)");
        }
        if (action == null && acquireTimeoutMillis == -1) {
            timeoutTask = null;
            acquireTimeoutNanos = -1;
        } else if (action == null && acquireTimeoutMillis != -1) {
            throw new NullPointerException("action");
        } else if (action != null && acquireTimeoutMillis < 0) {
            throw new IllegalArgumentException("acquireTimeoutMillis: " + acquireTimeoutMillis + " (expected: >= 0)");
        } else {
            acquireTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(acquireTimeoutMillis);
            switch (action) {
            case FAIL:
                timeoutTask = new TimeoutTask() {
                    @Override
                    public void onTimeout(AcquireTask task) {
                        // Fail the promise as we timed out.
                        task.promise.setFailure(new TimeoutException(
                                "Acquire operation took longer then configured maximum time") {
                            @Override
                            public Throwable fillInStackTrace() {
                                return this;
                            }
                        });
                    }
                };
                break;
            case NEW:
                timeoutTask = new TimeoutTask() {
                    @Override
                    public void onTimeout(AcquireTask task) {
                        // Increment the acquire count and delegate to super to actually acquire a Channel which will
                        // create a new connection.
                        task.acquired();

                        FixedChannelPool.super.acquire(task.promise);
                    }
                };
                break;
            default:
                throw new Error();
            }
        }
        executor = bootstrap.config().group().next();
        this.maxConnections = maxConnections;
        this.maxPendingAcquires = maxPendingAcquires;
    }
复制代码
  • maxConnections 连接池中的最大连接数。
  • acquireTimeoutNanos 等待连接池连接的最大时间,单位毫秒。
  • maxPendingAcquires 在请求获取/建立连接大于maxConnections数时,创建等待建立连接的最大定时任务数量。例如maxConnections=2,此时已经建立了2连接,但是没有放入到连接池中,接下来的请求就会放入到一个后台执行的定时任务中,如果到了时间连接池中还没有连接,就可以建立不大于 maxPendingAcquires 的连接数,如果连接池中有连接了就从连接池中获取。
  • executor 用于执行获取连接和释放连接的EventLoop。
  • TimeoutTask.FAIL :如果连接池中没有可用连接了,等待 acquireTimeoutNanos 后,抛出一个超时异常。
  • TimeoutTask.NEW :如果连接池中没有可用连接了,等待 acquireTimeoutNanos 后,创建一个新的连接。

FixedChannelPool#acquire

@Override
    public Future<Channel> acquire(final Promise<Channel> promise) {
        // 使用同一个executor保证线程安全
        try {
            if (executor.inEventLoop()) {
                acquire0(promise);
            } else {
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        acquire0(promise);
                    }
                });
            }
        } catch (Throwable cause) {
            promise.setFailure(cause);
        }
        return promise;
    }
    
    private void acquire0(final Promise<Channel> promise) {
        assert executor.inEventLoop();

        if (closed) {
            promise.setFailure(new IllegalStateException("FixedChannelPool was closed"));
            return;
        }
        if (acquiredChannelCount.get() < maxConnections) {
            assert acquiredChannelCount.get() >= 0;

            // We need to create a new promise as we need to ensure the AcquireListener runs in the correct
            // EventLoop
            // 创建一个新的Promise
            Promise<Channel> p = executor.newPromise();
            AcquireListener l = new AcquireListener(promise);
            l.acquired();
            p.addListener(l);
            super.acquire(p);
        } else {
            if (pendingAcquireCount >= maxPendingAcquires) {
                tooManyOutstanding(promise);
            } else {
                AcquireTask task = new AcquireTask(promise);
                if (pendingAcquireQueue.offer(task)) {
                    ++pendingAcquireCount;

                    if (timeoutTask != null) {
                        task.timeoutFuture = executor.schedule(timeoutTask, acquireTimeoutNanos, TimeUnit.NANOSECONDS);
                    }
                } else {
                    tooManyOutstanding(promise);
                }
            }

            assert pendingAcquireCount > 0;
        }
    }
复制代码

FixedChannelPool重写了SimpleChannelPool的 acquire(final Promise<Channel> promise) 方法,把所有的获取连接的任务 acquire0 交给一个EventLoop执行,让获取我而不是像SimpleChannelPool中每获取一个连接都使用线程选择器选择一个EventLoop,用此来保证在高并发的情况下 acquiredChannelCountd < maxConnections 的安全性,可以创建预期的连接数。所以在 acquire0 中需要为FixedChannelPool中的EventLoop创建一个新的promise,然后调用SimpleChannelPool的 acquire(final Promise<Channel> promise) 方法用来建立一个新的连接或者从连接池中获取一个连接。

每创建一个连接并且这个连接没有放入到连接池时 acquiredChannelCount 都会增加加1,用来保证不创建超过 maxConnections 的连接数。 acquiredChannelCountd > maxConnections 的时候,FixedChannelPool会根据 pendingAcquireCount 的值来判断是否创建一个定时的任务,去建立新的连接。

private abstract class TimeoutTask implements Runnable {              
    @Override                                                         
    public final void run() {                                         
        assert executor.inEventLoop();                                
        long nanoTime = System.nanoTime();                            
        for (;;) {                                                    
            AcquireTask task = pendingAcquireQueue.peek();            
            // 检查是否到了执行时间        
            if (task == null || nanoTime - task.expireNanoTime < 0) { 
                break;                                                
            }                                                         
            pendingAcquireQueue.remove();                             
                                                                      
            --pendingAcquireCount;                                    
            onTimeout(task);                                          
        }                                                             
    }                                                                 
                                                                      
    public abstract void onTimeout(AcquireTask task);                 
}                                                                     
                                                                      
复制代码

FixedChannelPool#release

释放连接的时候同样使用FixedChannelPool中的EventLoop保证高并发下的线程安全问题,在释放连接的时候会执行 decrementAndRunTaskQueue() 方法,成功后会尝试终止定时任务,从连接池中返回连接到promise。

private void decrementAndRunTaskQueue() {
        // We should never have a negative value.
        int currentCount = acquiredChannelCount.decrementAndGet();
        assert currentCount >= 0;
        runTaskQueue();
    }
    
private void runTaskQueue() {
        while (acquiredChannelCount.get() < maxConnections) {
            AcquireTask task = pendingAcquireQueue.poll();
            if (task == null) {
                break;
            }

            // Cancel the timeout if one was scheduled
            ScheduledFuture<?> timeoutFuture = task.timeoutFuture;
            if (timeoutFuture != null) {
                timeoutFuture.cancel(false);
            }

            --pendingAcquireCount;
            task.acquired();

            super.acquire(task.promise);
        }

        // We should never have a negative value.
        assert pendingAcquireCount >= 0;
        assert acquiredChannelCount.get() >= 0;
    }    
    
    
复制代码
原文  https://juejin.im/post/5e9942e2f265da47d00a6776
正文到此结束
Loading...