从客户端发送字母A,经过解码器ByteToMessageDecoder、ReplayingDecoder、MessageToMessageDecoder以及编码器MessageToMessageEncoder、MessageToByteEncoder,把A解码为a、b、c,再编码成d、e
EchoServer
public class EchoServer { public static void main(String[] args) throws InterruptedException { // 创建NioEventLoopGroup类型的EventLoopGroup EventLoopGroup group = new NioEventLoopGroup(); try { // 创建ServerBootstrap ServerBootstrap sbs = new ServerBootstrap(); sbs.group(group) // 设置Channel为NIO的服务端Channel .channel(NioServerSocketChannel.class) // 绑定本地端口 .localAddress(new InetSocketAddress(Const.PORT)) // 新连接被接受时,会创建一个Channel // 再把把echoServerHandler加入到这个Channel的ChannelPipeline中 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new MyMessageToByteEncoder()); socketChannel.pipeline().addLast(new MyMessageToMessageEncoder()); socketChannel.pipeline().addLast(new MyByteToMessageDecoder()); socketChannel.pipeline().addLast(new MyReplayingDecoder()); socketChannel.pipeline().addLast(new MyMessageToMessageDecoder()); } }); // 异步绑定服务器,阻塞到服务器绑定完成 ChannelFuture sync = sbs.bind().sync(); // 获取channel的closeFuture,阻塞到关闭 sync.channel().closeFuture().sync(); } finally { // 优雅的关掉group并释放所有的资源 group.shutdownGracefully().sync(); } } }
MyByteToMessageDecoder
public class MyByteToMessageDecoder extends ByteToMessageDecoder { /** * @param ctx * @param in 传过来的ByteBuf * @param out 添加解码消息的List * @throws Exception */ @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { String str = ""; str += (char) in.readByte(); System.out.println("MyByteToMessageDecoder receive:" + str); str = "a"; ByteBuf byteBuf = Unpooled.buffer(); byteBuf.writeBytes(str.getBytes()); out.add(byteBuf); System.out.println("MyByteToMessageDecoder send:" + str); } }
MyReplayingDecoder
public class MyReplayingDecoder extends ReplayingDecoder<Void> { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { String str = ""; str += (char) in.readByte(); System.out.println("MyReplayingDecoder receive:" + str); str = "b"; out.add(str); System.out.println("MyReplayingDecoder send:" + str); } }
MyMessageToMessageDecoder
public class MyMessageToMessageDecoder extends MessageToMessageDecoder<String> { @Override protected void decode(ChannelHandlerContext ctx, String msg, List out) throws Exception { System.out.println("MyMessageToMessageDecoder receive:" + msg); //out.add(msg); msg = "c"; System.out.println("MyMessageToMessageDecoder send:" + msg); ctx.write(msg); } }
MyMessageToByteEncoder
public class MyMessageToByteEncoder extends MessageToByteEncoder<String> { @Override protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception { System.out.println("MyMessageToByteEncoder receive:" + msg); msg = "e"; System.out.println("MyMessageToByteEncoder send:" + msg); out.writeBytes(msg.getBytes()); ctx.writeAndFlush(out); } }
MessageToMessageEncoder
public class MyMessageToMessageEncoder extends MessageToMessageEncoder<String> { @Override protected void encode(ChannelHandlerContext ctx, String msg, List out) throws Exception { System.out.println("MyMessageToMessageEncoder receive:" + msg); msg = "d"; System.out.println("MyMessageToMessageEncoder send:" + msg); out.add(msg); } }
Client
public class Client { public static void main(String[] args) throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup(); try { // 创建Bootstrap Bootstrap bs = new Bootstrap(); bs.group(group) // 设置Channel为NIO的客户端Channel .channel(NioSocketChannel.class) // 设置服务器的地址端口信息 .remoteAddress(new InetSocketAddress(Const.IP, Const.PORT)) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ClientHandler()); } }); // 连接远程服务器,阻塞到连接完成 ChannelFuture cf = bs.connect().sync(); // 获取channel的closeFuture,阻塞到关闭 cf.channel().closeFuture().sync(); } finally { // 优雅的关掉group并释放所有的资源 group.shutdownGracefully().sync(); } } }
ClientHandler
public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> { /** * 客户端收到服务器消息后调用 * * @param channelHandlerContext * @param byteBuf * @throws Exception */ @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception { // 处理收到的消息 System.out.println("客户端收到信息:" + byteBuf.toString(CharsetUtil.UTF_8)); } /** * 客户端与服务器连接后调用 * * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 当channel是活跃的时候,往服务端发送一条消息 ctx.writeAndFlush(Unpooled.copiedBuffer("A", CharsetUtil.UTF_8)); } /** * 客户端处理消息过程中,对异常的处理 * * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
服务端运行结果
客户端运行结果
在netty中,解码器包括两种:
decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
ByteBuf,是调用这个方法时,传入的数据,List是用来存放解码信息的。如果ByteBuf没有读取完,这个List有值,会传入ChannelPipeline的下一个ChannelInboundHandler。
除了decode,这个类还有decodeLast方法,当Channel的状态变为非活动时会被调用。
MessageToMessageDecoder把一种类型的消息转为另外一种,上面例子中,是把String转String,我们简单的做了字符串替换。msg的类型,是需要传入的类型,解析后存入List。
decode(ChannelHandlerContext ctx, String msg, List out)
编码器也有两种方式:
msg,是传入消息的类型及数据,out是会被传到ChannelPipeline的下一个ChannelOutboundHandler。
encode(ChannelHandlerContext ctx, String msg, ByteBuf out)
msg,是传入消息的类型及数据,out是存放编码后的消息,也会被传到ChannelPipeline的下一个ChannelOutboundHandler。
encode(ChannelHandlerContext ctx, String msg, List out)