从客户端发送字母A,经过解码器ByteToMessageDecoder、ReplayingDecoder、MessageToMessageDecoder以及编码器MessageToMessageEncoder、MessageToByteEncoder,把A解码为a、b、c,再编码成d、e
EchoServer
public class EchoServer {
    public static void main(String[] args) throws InterruptedException {
        // 创建NioEventLoopGroup类型的EventLoopGroup
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            // 创建ServerBootstrap
            ServerBootstrap sbs = new ServerBootstrap();
            sbs.group(group)
                    // 设置Channel为NIO的服务端Channel
                    .channel(NioServerSocketChannel.class)
                    // 绑定本地端口
                    .localAddress(new InetSocketAddress(Const.PORT))
                    // 新连接被接受时,会创建一个Channel
                    // 再把把echoServerHandler加入到这个Channel的ChannelPipeline中
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new MyMessageToByteEncoder());
                            socketChannel.pipeline().addLast(new MyMessageToMessageEncoder());
                            socketChannel.pipeline().addLast(new MyByteToMessageDecoder());
                            socketChannel.pipeline().addLast(new MyReplayingDecoder());
                            socketChannel.pipeline().addLast(new MyMessageToMessageDecoder());
                        }
                    });
            // 异步绑定服务器,阻塞到服务器绑定完成
            ChannelFuture sync = sbs.bind().sync();
            // 获取channel的closeFuture,阻塞到关闭
            sync.channel().closeFuture().sync();
        } finally {
            // 优雅的关掉group并释放所有的资源
            group.shutdownGracefully().sync();
        }
    }
} 
 MyByteToMessageDecoder
public class MyByteToMessageDecoder extends ByteToMessageDecoder {
    /**
     * @param ctx
     * @param in  传过来的ByteBuf
     * @param out 添加解码消息的List
     * @throws Exception
     */
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        String str = "";
        str += (char) in.readByte();
        System.out.println("MyByteToMessageDecoder receive:" + str);
        str = "a";
        ByteBuf byteBuf = Unpooled.buffer();
        byteBuf.writeBytes(str.getBytes());
        out.add(byteBuf);
        System.out.println("MyByteToMessageDecoder send:" + str);
    }
} 
 MyReplayingDecoder
public class MyReplayingDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        String str = "";
        str += (char) in.readByte();
        System.out.println("MyReplayingDecoder receive:" + str);
        str = "b";
        out.add(str);
        System.out.println("MyReplayingDecoder send:" + str);
    }
} 
 MyMessageToMessageDecoder
public class MyMessageToMessageDecoder extends MessageToMessageDecoder<String> {
    @Override
    protected void decode(ChannelHandlerContext ctx, String msg, List out) throws Exception {
        System.out.println("MyMessageToMessageDecoder receive:" + msg);
        //out.add(msg);
        msg = "c";
        System.out.println("MyMessageToMessageDecoder send:" + msg);
        ctx.write(msg);
    }
} 
 MyMessageToByteEncoder
public class MyMessageToByteEncoder extends MessageToByteEncoder<String> {
    @Override
    protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception {
        System.out.println("MyMessageToByteEncoder receive:" + msg);
        msg = "e";
        System.out.println("MyMessageToByteEncoder send:" + msg);
        out.writeBytes(msg.getBytes());
        ctx.writeAndFlush(out);
    }
} 
 MessageToMessageEncoder
public class MyMessageToMessageEncoder extends MessageToMessageEncoder<String> {
    @Override
    protected void encode(ChannelHandlerContext ctx, String msg, List out) throws Exception {
        System.out.println("MyMessageToMessageEncoder receive:" + msg);
        msg = "d";
        System.out.println("MyMessageToMessageEncoder send:" + msg);
        out.add(msg);
    }
} 
 Client
public class Client {
    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            // 创建Bootstrap
            Bootstrap bs = new Bootstrap();
            bs.group(group)
                    // 设置Channel为NIO的客户端Channel
                    .channel(NioSocketChannel.class)
                    // 设置服务器的地址端口信息
                    .remoteAddress(new InetSocketAddress(Const.IP, Const.PORT))
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new ClientHandler());
                        }
                    });
            // 连接远程服务器,阻塞到连接完成
            ChannelFuture cf = bs.connect().sync();
            // 获取channel的closeFuture,阻塞到关闭
            cf.channel().closeFuture().sync();
        } finally {
            // 优雅的关掉group并释放所有的资源
            group.shutdownGracefully().sync();
        }
    }
} 
 ClientHandler
public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    /**
     * 客户端收到服务器消息后调用
     *
     * @param channelHandlerContext
     * @param byteBuf
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
        // 处理收到的消息
        System.out.println("客户端收到信息:" + byteBuf.toString(CharsetUtil.UTF_8));
    }
    /**
     * 客户端与服务器连接后调用
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 当channel是活跃的时候,往服务端发送一条消息
        ctx.writeAndFlush(Unpooled.copiedBuffer("A", CharsetUtil.UTF_8));
    }
    /**
     * 客户端处理消息过程中,对异常的处理
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
} 
 服务端运行结果
 
 
客户端运行结果
 
 
在netty中,解码器包括两种:
decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
ByteBuf,是调用这个方法时,传入的数据,List是用来存放解码信息的。如果ByteBuf没有读取完,这个List有值,会传入ChannelPipeline的下一个ChannelInboundHandler。
 
 
除了decode,这个类还有decodeLast方法,当Channel的状态变为非活动时会被调用。
MessageToMessageDecoder把一种类型的消息转为另外一种,上面例子中,是把String转String,我们简单的做了字符串替换。msg的类型,是需要传入的类型,解析后存入List。
decode(ChannelHandlerContext ctx, String msg, List out)
编码器也有两种方式:
msg,是传入消息的类型及数据,out是会被传到ChannelPipeline的下一个ChannelOutboundHandler。
encode(ChannelHandlerContext ctx, String msg, ByteBuf out)
msg,是传入消息的类型及数据,out是存放编码后的消息,也会被传到ChannelPipeline的下一个ChannelOutboundHandler。
encode(ChannelHandlerContext ctx, String msg, List out)