简单找了下发现网上没有关于Netty3比较完整的源码解析的文章,于是我就去读官方文档,为了加强记忆,翻译成了中文,有适当的简化。
原文档地址: Netty3文档
运行demo的前提有两个:最新版本的Netty3和JDK1.5以上
最简单的协议就是Discard协议——忽略所有接收到的数据并且不作任何响应。我们从Netty处理I/O事件的handler实现开始:
public class DiscardServerHandler extends SimpleChannelHandler {
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
e.getCause().printStackTrace();
Channel ch = e.getChannel();
ch.close();
}
}
接下来写一个main方法来开启使用DiscardServerHandler的服务:
public class DiscardServer {
public static void main(String[] args) throws Exception {
ChannelFactory factory =
new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool());
ServerBootstrap bootstrap = new ServerBootstrap(factory);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
return Channels.pipeline(new DiscardServerHandler());
}
});
bootstrap.setOption("child.tcpNoDelay", true);
bootstrap.setOption("child.keepAlive", true);
bootstrap.bind(new InetSocketAddress(8080));
}
}
我们可以通过"telnet localhost 8080"命令去测试服务,但因为是Discard服务,我们都不知道服务是否正常工作。所以我们修改下服务,让它打印出接收到的数据。
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
ChannelBuffer buf = (ChannelBuffer) e.getMessage();
while(buf.readable()) {
System.out.println((char) buf.readByte());
System.out.flush();
}
}
一个服务通常对请求是有响应的。接下来我们尝试写一个实现Echo协议——将接收的数据原路返回给客户端的服务:
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
Channel ch = e.getChannel();
ch.write(e.getMessage());
}
这次我们实现一个时间协议——在不需要任何请求数据的情况下返回一个32位整型数字并且在发送之后关闭连接。因为我们忽略请求数据,只需要在连接建立的发送消息,所以这次不能使用messageReceived方法而是重写channelConnected方法:
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
Channel ch = e.getChannel();
ChannelBuffer time = ChannelBuffers.buffer(4);
time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
ChannelFuture f = ch.write(time);
f.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
Channel ch = future.getChannel();
ch.close();
}
});
}
我们还需要一个遵守时间协议,即能把整型数字翻译成日期的客户端。Netty服务端和客户端唯一的区别就是要求不同的Bootstrap和ChannelFactory:
public static void main(String[] args) throws Exception {
String host = args[0];
int port = Integer.parseInt(args[1]);
ChannelFactory factory =
new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool());
ClientBootstrap bootstrap = new ClientBootstrap(factory);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
return Channels.pipeline(new TimeClientHandler());
}
});
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("keepAlive", true);
bootstrap.connect(new InetSocketAddress(host, port));
}
另外我们需要一个ChannelHandler实现,负责把接收到服务端返回的32位整型数字翻译成日期并打印出来,然后断开连接:
public class TimeClientHandler extends SimpleChannelHandler {
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
ChannelBuffer buf = (ChannelBuffer) e.getMessage();
long currentTimeMillis = buf.readInt() * 1000L;
System.out.println(new Date(currentTimeMillis));
e.getChannel().close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
e.getCause().printStackTrace();
e.getChannel().close();
}
}
看上去很简单是吧?但是实际运行过程中这个handler有时会抛出一个IndexOutOfBoundsException。下一节我们会讨论为什么会这样。
在像TCP/IP那样基于流的传输中,接收数据保存在一个socket接收缓存中。但是这个缓存不是一个以包为单位的队列,而是一个以字节为单位的队列。这就意味着,即使发送两个独立的消息,操作系统会把他们视为一个字节串。因此,不能保证你读到的和另一端写入的一样。所以,不管是客户端还是服务端,对于接收到的数据都需要整理成符合应用程序逻辑的结构。
回到前面的时间客户端的问题,32位整型数字很小,但是它也是可以拆分的,特别是当流量上升的时候,被拆分的可能性也随之上升。
一个简单的处理方式就是内部创建一个累计的缓存,直到接收满4个字节才进行处理。
private final ChannelBuffer buf = dynamicBuffer();
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
ChannelBuffer m = (ChannelBuffer) e.getMessage();
buf.writeBytes(m);
if (buf.readableBytes() >= 4) {
long currentTimeMillis = buf.readInt() * 1000L;
System.out.println(new Date(currentTimeMillis));
e.getChannel().close();
}
}
第一种方案有很多问题,比如一个复杂的协议,由多个可变长度的域组成,这种情况下第一种方案的handler就无法支持了。
你会发现你可以添加多个ChannelHandler到ChannelPipeline中,利用这个特性,你可以把一个臃肿的ChannelHandler拆分到多个模块化的ChannelHandler中,这样可以降低应用程序的复杂度。比如,你可以把TimeClientHandler拆分成两个handler:
Netty提供了可扩展的类帮助你实现TimeDecoder:
public class TimeDecoder extends FrameDecoder {
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) {
if (buffer.readableBytes() < 4) {
return null;
}
return buffer.readBytes(4);
}
}
拆分之后,我们需要修改TimeClient的ChannelPipelineFactory实现:
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
return Channels.pipeline(
new TimeDecoder(),
new TimeClientHandler());
}
});
Netty还提供了进一步简化解码的ReplayingDecoder:
public class TimeDecoder extends ReplayingDecoder<VoidEnum> {
@Override
protected Object decode(
ChannelHandlerContext ctx, Channel channel,
ChannelBuffer buffer, VoidEnum state) {
return buffer.readBytes(4);
}
}
此外,Netty提供了一批开箱即用的解码器,让你可以简单得实现大多数协议:
上面的demo我们都是用ChannelBuffer作为协议化消息的基本数据结构,这一节我们用POJO替代ChannelBuffer。将从ChannelBuffer提取信息的代码跟handler分离开,会使handler变得更加可维护的和可重用的。从上面的demo里不容易看出这个优势,但是实际应用中分离很有必要。
首先,我们定义一个类型UnixTime:
public class UnixTime {
private final int value;
public UnixTime(int value) {
this.value = value;
}
public int getValue() {
return value;
}
@Override
public String toString() {
return new Date(value * 1000L).toString();
}
}
现在我们可以修改TimeDecoder让它返回一个UnixTime而不是ChannelBuffer:
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) {
if (buffer.readableBytes() < 4) {
return null;
}
return new UnixTime(buffer.readInt());
}
编码器改了,那么相应的TimeClientHandler就不会继续使用ChannelBuffer:
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
UnixTime m = (UnixTime) e.getMessage();
System.out.println(m);
e.getChannel().close();
}
同样的技术也可以应用到服务端的TimeServerHandler上:
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
UnixTime time = new UnixTime((int)(System.currentTimeMillis() / 1000));
ChannelFuture f = e.getChannel().write(time);
f.addListener(ChannelFutureListener.CLOSE);
}
能这样运用的前提是有一个编码器,可以把UnixTime对象翻译成ChannelBuffer:
public class TimeEncoder extends SimpleChannelHandler {
public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) {
UnixTime time = (UnixTime) e.getMessage();
ChannelBuffer buf = buffer(4);
buf.writeInt(time.getValue());
Channels.write(ctx, e.getFuture(), buf);
}
}
同样,TimeEncoder也需要加入到服务端的ChannelPipeline中:
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
return Channels.pipeline(
new TimeServerHandler(),
new TimeEncoder());
}
});
为了关闭I/O线程让应用程序优雅得退出,我们需要释放ChannelFactory分配的资源。
一个典型网络应用程序的关闭过程分为三步:
应用到TimeClient上:
ChannelFuture future = bootstrap.connect(...);
future.awaitUninterruptibly();
if (!future.isSuccess()) {
future.getCause().printStackTrace();
}
future.getChannel().getCloseFuture().awaitUninterruptibly();
factory.releaseExternalResources();
关闭一个客户端很简单,那服务端呢?你需要从端口解绑并关闭所有接收到的连接。前提是你需要一个保持跟踪活跃连接的数据结构,Netty提供了ChannelGroup。
ChannelGroup是Java集合API一个特殊的的扩展,它代表一组打开的Channel。如果一个Channel被添加到ChannelGroup,然后这个Channel被关闭了,它会从ChannelGroup中自动移除。你可以对同一ChannelGroup中的Channel做批量操作,比如在关闭服务的时候关闭所有Channel。
要跟踪打开的socket,你需要修改TimeServerHandler,把新打开的Channel添加到全局的ChannelGroup变量中。ChannelGroup是线程安全的。
@Override
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) {
TimeServer.allChannels.add(e.getChannel());
}
现在我们自动维持了一个包含所有活跃Channel的列表,关闭服务端就像关闭客户端一样容易了。
public class TimeServer {
static final ChannelGroup allChannels = new DefaultChannelGroup("time-server");
public static void main(String[] args) throws Exception {
...
ChannelFactory factory = ...;
...
ServerBootstrap bootstrap = ...;
...
Channel channel = bootstrap.bind(new InetSocketAddress(8080));
allChannels.add(channel);
waitForShutdownCommand();
ChannelGroupFuture future = allChannels.close();
future.awaitUninterruptibly();
factory.releaseExternalResources();
}
}
这一节我们快速浏览了Netty,示范了如何用Netty写一个能正常工作的网络应用。
下一节将介绍Netty的更多细节。