转载

Netty 核心源码解读 —— ServerBootstrap 篇

Netty 核心源码解读 —— ServerBootstrap 篇

本文我们就开始对 ServerBootstrap 进行源码解读(4.1.51.Final-SNAPSHOT),为什么是 ServerBootstrap,记得在用 Netty 做第一个项目的时候,写的第一行 Code 就是 new ServerBootstrap() ,ServerBootstrap 是 Netty Server 的启动类,所以从它开始了解 Netty 是最合适的。

ServerBootstrap

## TcpServer.java


private final EventLoopGroup bossGroup = new NioEventLoopGroup(BIZ_GROUP_SIZE);

private final EventLoopGroup workerGroup = new NioEventLoopGroup(BIZ_THREAD_SIZE);


public void init() throws Exception {

// Server 服务启动

ServerBootstrap bootstrap = new ServerBootstrap();


bootstrap.group(bossGroup, workerGroup);

bootstrap.channel(NioServerSocketChannel.class);

bootstrap.childHandler(new ServerChannelInitializer(serverConfig));

// 可选参数

bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);


// 绑定接口,同步等待成功

ChannelFuture future = bootstrap.bind(port).sync();

ChannelFuture channelFuture = future.addListener(new ChannelFutureListener() {

public void operationComplete(ChannelFuture future) throws Exception {

}

});

}

这是我在做 TCP 网关时写的 Netty Server 的代码片段(https://github.com/SongranZhang/tcp-gateway/blob/master/src/main/java/com/linkedkeeper/tcp/connector/tcp/server/TcpServer.java),可以看到,Netty Server 的初始化首先是通过 ServerBootstrap 的无参构造函数创建一个对象,接着是这个对象的一串链式调用 bootstrap.group().channel().childHandler().childOption() ,而服务启动的真正触发点是这段  bootstrap.bind(port).sync() ,下面我们就逐一来分析下这里的每个方法。

首先是 group() 方法。

## ServerBootstrap.java


private volatile EventLoopGroup childGroup;


public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {

super.group(parentGroup);

if (this.childGroup != null) {

throw new IllegalStateException("childGroup set already");

}

this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");

return this;

}

这里 workerGroup 赋值给了 ServerBootstrap 的 childGroup,bossGroup 赋值给了父类 AbstractBootstrap 的 group。

## AbstractBootstrap.java


volatile EventLoopGroup group;


public B group(EventLoopGroup group) {

ObjectUtil.checkNotNull(group, "group");

if (this.group != null) {

throw new IllegalStateException("group set already");

}

this.group = group;

return self();

}

接下来是  channel()  方法。

## AbstractBootstrap.java


private volatile ChannelFactory<? extends C> channelFactory;


public B channel(Class<? extends C> channelClass) {

return channelFactory(new ReflectiveChannelFactory<C>(

ObjectUtil.checkNotNull(channelClass, "channelClass")

));

}


public B channelFactory(ChannelFactory<? extends C> channelFactory) {

ObjectUtil.checkNotNull(channelFactory, "channelFactory");

if (this.channelFactory != null) {

throw new IllegalStateException("channelFactory set already");

}


this.channelFactory = channelFactory;

return self();

}

这里 NioServerSocketChannel.class 通过 ReflectiveChannelFactory 进行了实例化,然后赋值给了 AbstractBootstrap 的 channelFactory。

接下来是 childHandler() 方法。

## ServerBootstrap.java


private volatile ChannelHandler childHandler;


public ServerBootstrap childHandler(ChannelHandler childHandler) {

this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler");

return this;

}

这里是对 ServerBootstrap 的 childHandler 赋值。

最后是 childOption() 方法。

## ServerBootstrap.java


private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();


public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) {

ObjectUtil.checkNotNull(childOption, "childOption");

synchronized (childOptions) {

if (value == null) {

childOptions.remove(childOption);

} else {

childOptions.put(childOption, value);

}

}

return this;

}

这里 childOptions 维护了 TCP 的参数设置。

简言之 bootstrap.group().channel().childHandler().childOption() 就是在构建 Netty Server 的各种参数,下面再来看  bootstrap.bind(port).sync()

首先是 bind() 方法。

## AbstractBootstrap.java


public ChannelFuture bind(int inetPort) {

return bind(new InetSocketAddress(inetPort));

}


public ChannelFuture bind(SocketAddress localAddress) {

validate();

return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));

}


public B validate() {

if (group == null) {

throw new IllegalStateException("group not set");

}

if (channelFactory == null) {

throw new IllegalStateException("channel or channelFactory not set");

}

return self();

}

这里的  validate()  方法对 AbstractBootstrap 的 group 和 channelFactory 进行非空校验,之后调用  doBind()  方法。

## AbstractBootstrap.java


private ChannelFuture doBind(final SocketAddress localAddress) {

final ChannelFuture regFuture = initAndRegister();

final Channel channel = regFuture.channel();

if (regFuture.cause() != null) {

return regFuture;

}


if (regFuture.isDone()) {

// At this point we know that the registration was complete and successful.

ChannelPromise promise = channel.newPromise();

doBind0(regFuture, channel, localAddress, promise);

return promise;

} else {

// Registration future is almost always fulfilled already, but just in case it's not.

final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);

regFuture.addListener(new ChannelFutureListener() {

@Override

public void operationComplete(ChannelFuture future) throws Exception {

Throwable cause = future.cause();

if (cause != null) {

// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an

// IllegalStateException once we try to access the EventLoop of the Channel.

promise.setFailure(cause);

} else {

// Registration was successful, so set the correct executor to use.

// See https://github.com/netty/netty/issues/2586

promise.registered();


doBind0(regFuture, channel, localAddress, promise);

}

}

});

return promise;

}

}

首先看一下  initAndRegister()  方法。

## AbstractBootstrap.java


final ChannelFuture initAndRegister() {

Channel channel = null;

try {

channel = channelFactory.newChannel();

init(channel);

} catch (Throwable t) {

if (channel != null) {

// channel can be null if newChannel crashed (eg SocketException("too many open files"))

channel.unsafe().closeForcibly();

// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor

return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);

}

// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor

return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);

}


ChannelFuture regFuture = config().group().register(channel);

if (regFuture.cause() != null) {

if (channel.isRegistered()) {

channel.close();

} else {

channel.unsafe().closeForcibly();

}

}


// If we are here and the promise is not failed, it's one of the following cases:

// 1) If we attempted registration from the event loop, the registration has been completed at this point.

// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.

// 2) If we attempted registration from the other thread, the registration request has been successfully

// added to the event loop's task queue for later execution.

// i.e. It's safe to attempt bind() or connect() now:

// because bind() or connect() will be executed *after* the scheduled registration task is executed

// because register(), bind(), and connect() are all bound to the same thread.


return regFuture;

}

这里  channelFactory.newChannel()  调用的是 ReflectiveChannelFactory 的 newChannel 方法。

## ReflectiveChannelFactory


private final Constructor<? extends T> constructor;


public ReflectiveChannelFactory(Class<? extends T> clazz) {


ObjectUtil.checkNotNull(clazz, "clazz");

try {

this.constructor = clazz.getConstructor();

} catch (NoSuchMethodException e) {

throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +

" does not have a public non-arg constructor", e);

}

}


public T newChannel() {

try {

return constructor.newInstance();

} catch (Throwable t) {

throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);

}

}

这里  constructor.newInstance()  是  NioServerSocketChannel.class  的一个实例。得到 channel 后,调用  init(channel)  进行初始化,一是给 options 和 attrs 赋值,二是构建 pipeline。

## ServerBootstrap.java


void init(Channel channel) {

setChannelOptions(channel, newOptionsArray(), logger);

setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));


ChannelPipeline p = channel.pipeline();


final EventLoopGroup currentChildGroup = childGroup;

final ChannelHandler currentChildHandler = childHandler;

final Entry<ChannelOption<?>, Object>[] currentChildOptions;

synchronized (childOptions) {

currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);

}

final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);


p.addLast(new ChannelInitializer<Channel>() {

@Override

public void initChannel(final Channel ch) {

final ChannelPipeline pipeline = ch.pipeline();

ChannelHandler handler = config.handler();

if (handler != null) {

pipeline.addLast(handler);

}


ch.eventLoop().execute(new Runnable() {

@Override

public void run() {

pipeline.addLast(new ServerBootstrapAcceptor(

ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));

}

});

}

});

}

回到  initAndRegister()  方法中, init(channel)  之后是  register(channel) ,该方法在 NioEventLoopGroup 的父类 MultithreadEventLoopGroup 中实现,我们在解读 NioEventLoop 源码时再分析。

## MultithreadEventLoopGroup


public ChannelFuture register(Channel channel) {

return next().register(channel);

}

看完  initAndRegister() ,再回到  doBind()  接着看  doBind0()

## AbstractBootstrap


private static void doBind0(

final ChannelFuture regFuture, final Channel channel,

final SocketAddress localAddress, final ChannelPromise promise) {


// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up

// the pipeline in its channelRegistered() implementation.

channel.eventLoop().execute(new Runnable() {

@Override

public void run() {

if (regFuture.isSuccess()) {

channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);

} else {

promise.setFailure(regFuture.cause());

}

}

});

}

这里 regFuture.isSuccess() 会执行  channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); ,否者执行  promise.setFailure(regFuture.cause()); ,这里的 promise 可以认为是一种特殊的 Future 对象。bind 是在 ChannelPipeline 里进行绑定的,我们在解读 ChannelPipeline 源码时再分析。

最后看一下 bootstrap.bind(serverPort).sync() 中的  sync()bootstrap.bind(serverPort) 返回的是 ChannelFuture,所以  sync() 是调用 DefaultChannelPromise 的方法。

## DefaultChannelPromise


public ChannelPromise sync() throws InterruptedException {

super.sync();

return this;

}

这里  super.sync();  调用了父类的方法。

## DefaultPromise


public Promise<V> sync() throws InterruptedException {

await();

rethrowIfFailed();

return this;

}


public Promise<V> await() throws InterruptedException {

if (isDone()) {

return this;

}


if (Thread.interrupted()) {

throw new InterruptedException(toString());

}


checkDeadLock();


synchronized (this) {

while (!isDone()) {

incWaiters();

try {

wait();

} finally {

decWaiters();

}

}

}

return this;

}

这里 while(!isDone()) 会进入循环,调用  sync() 后线程会被阻塞住。

总结

本篇也是写了好久,本文介绍了 ServerBootstrap,它是构建 Netty Server 的主要实现类,ServerBootstrap 里主要是对各种属性进行赋值,并创建 Channel 和 ChannelPipeline,最后绑定本地端口开始监听 IO 事件。在后续的文章里,我会继续与大家讨论 Netty 的 EventLoop,还请大家多多关注我的个人博客或公账号。

原文  http://mp.weixin.qq.com/s?__biz=MzI3MDU5OTU0MA==&mid=2247484419&idx=1&sn=4f98a42bd66ab898b6b2fe3495d7be4a
正文到此结束
Loading...