转载

从零学Netty之初探Netty

从零学Netty系列旨在记录笔者学习Netty的过程,从入门到熟练,尽量全面的对Netty做一次深入的探秘。

本文是 “从零学Netty” 系列的第一篇,我将介绍Netty的基本概念,线程模型,以及入门案例。

Netty基本概念

Netty是一个高性能网络通信框架,它提供了异步的、事件驱动的能力,能够帮助开发者快速开发高性能、高可靠的网络服务器和客户端程序。

Netty封装了Java NIO,屏蔽了复杂的底层实现,它具有以下优势:

  1. Netty具有一个活跃的社区
  2. Netty实现了各种协议,基本上不需要开发者对主流的协议进行二次开发
  3. Netty对线程,selector进行了优化,它的reactor线程模型有着良好的并发表现
  4. Netty具备完备的拆包解包,异常检测机制,能够让开发者专注于业务逻辑而不需要关心NIO的繁重细节
  5. Netty解决了JDK的原生NIO的很多包括空轮询在内的bug

Netty线程模型

Netty线程模型主要有三种

  • 单线程模型
  • 多线程模型
  • 主从多线程模型

我们主要关注它的主从线程模型,也就是经常听到的boss、worker线程模型。

主从Reactor线程模型的特点是:服务端用于接收客户端连接的不再是个1个单独的NIO线程,而是一个独立的NIO线程池。Acceptor接收到客户端TCP连接请求处理完成后(可能包含接入认证等),将新创建的SocketChannel注册到IO线程池(sub reactor线程池)的某个IO线程上,由它负责SocketChannel的读写和编解码工作。Acceptor线程池仅仅只用于客户端的登陆、握手和安全认证,一旦链路建立成功,就将链路注册到后端subReactor线程池的IO线程上,由IO线程负责后续的IO操作。

更多关于Netty线程模型的内容可以参考 《Netty Reactor模型》

入门案例

我们通过一个入门案例来直观的感受一下Netty的魅力。

该案例为经典的echoServer,即客户端发出一行文字,服务端处理后原样返回给客户端。

服务端EchoServer.java

首先开发服务端核心类EchoServer.java

public class EchoServer {

    public static void main(String[] args) throws Exception {
        int port = 8080;
        new EchoServer().bind(port);
    }

    public void bind(int port) throws Exception {

        // 创建EventLoopGroup实例
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {

            // 创建服务端辅助启动类ServerBootStrap对象
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            // 设置NIO线程组
            serverBootstrap.group(bossGroup, workerGroup)
                    // 设置NioServerSocketChannel,对应于JDK NIO类ServerSocketChannel
                    .channel(NioServerSocketChannel.class)
                    // 设置TCP参数,连接请求的最大队列长度
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    // 设置IO事件处理类,用来处理消息的编解码及我们的业务逻辑
                    .childHandler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new EchoServerHandler());
                        }
                    });
            // 绑定端口,同步等待成功
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            // 等待服务端监听端口关闭
            channelFuture.channel().closeFuture().sync();

        } catch (Exception e) {
            throw e;
        } finally {
            // 优雅停机
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

这里对EventLoopGroup展开解释一下:

NioEventLoop是Netty的Reactor线程,它的职责如下:

作为服务端Acceptor线程,负责处理客户端的请求接入;

作为客户端Connecor线程,负责注册监听连接操作位,用于判断异步连接结果;

作为IO线程,监听网络读操作位,负责从SocketChannel中读取报文;

作为IO线程,负责向SocketChannel写入报文发送给对方,如果发生写半包,会自动注册监听写事件,用于后续继续发送半包数据,直到数据全部发送完成;

作为定时任务线程,可以执行定时任务,例如链路空闲检测和发送心跳消息等;

作为线程执行器可以执行普通的任务线程(Runnable)。

服务端I/O事件处理类EchoServerHandler.java

我们实现服务端I/O事件处理类EchoServerHandler,它的实例在EchoServer中通过 childHandler 进行设置,用于完成处理消息的编解码逻辑

/**
* @author snowalker
* @version 1.0
* @date 2019/8/13 9:50
* @className
* @desc 服务端I/O事件处理类
*/
public class EchoServerHandler extends SimpleChannelInboundHandler {

    private static final Logger LOGGER = Logger.getLogger("EchoServerHandler");

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {

        // 接受客户端发来的数据,
        // 使用 buf.readableBytes() 获取数据大小,并转换为byte数组
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);

        String body = new String(req, "UTF-8");
        LOGGER.info("receive data from client:" + body);

        // 回写数据到客户端
        ByteBuf resp = Unpooled.copiedBuffer(body.getBytes());
        ctx.write(resp);
    }

    /**
    * 发生异常情况 关闭链路
    * @param ctx
    * @param cause
    * @throws Exception
    */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

    /**
    * 将发送缓冲区的消息全部写入SocketChannel中
    * @param ctx
    * @throws Exception
    */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }
}

客户端EchoClient.java

开发完服务端,我们接着编写客户端代码,监听服务端接口并发送报文给服务端。

public class EchoClient {

    private static final Logger LOGGER = Logger.getLogger("EchoClient");

    public static void main(String[] args) throws Exception  {
        int port = 8080;
        String localhost = "127.0.0.1";
        new EchoClient().connect(port, localhost);
    }

    public void connect(int port, String host) throws Exception {

        // 创建客户端处理IO读写的NIO线程组
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            // 创建客户端辅助启动类
            Bootstrap bootstrap = new Bootstrap();
            // 设置NIO线程组
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel ch) throws Exception {
                            // 配置客户端处理网络IO事件的类
                            ch.pipeline().addLast(new EchoClientHandler());
                        }
                    });
            // 发起连接操作
            ChannelFuture future = bootstrap.connect(host, port).sync();

            /////////////////模拟粘包/半包//////////////////////////
            for (int i = 0; i < 100; i++) {
                // 构造客户端发送的数据ByteBuf对象
                byte[] req = "玩转Netty实战".getBytes();
                ByteBuf messageBuffer = Unpooled.buffer(req.length);
                messageBuffer.writeBytes(req);

                // 向服务端发送数据
                ChannelFuture channelFuture = future.channel().writeAndFlush(messageBuffer);
                channelFuture.syncUninterruptibly();

            }
            /////////////////模拟粘包/半包//////////////////////////

            // 等待客户端链路关闭
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            throw e;
        } finally {
            group.shutdownGracefully();
            LOGGER.info("[EchoClient] has been shutdown");
        }
    }
}

这里通过循环向服务端发送报文。

客户端处理IO事件处理器EchoClientHandler.java

我们接着实现客户端处理IO事件处理器,并将其通过 handler() 设置到客户端线程组中。

/**
* @author snowalker
* @version 1.0
* @date 2019/8/13 11:09
* @className EchoClientHandler
* @desc 处理客户端IO事件
*/
public class EchoClientHandler extends SimpleChannelInboundHandler {

    private static final Logger LOGGER = Logger.getLogger("EchoClientHandler");

    /**
    * 服务端响应请求返回会回调该方法
    * 通过实现该方法来实现对服务端返回数据进行处理的逻辑
    * 从而实现客户端业务逻辑
    * @param ctx
    * @param msg
    * @throws Exception
    */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {

        // 获取服务端返回的数据buf
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);

        // 将服务端返回的byte数组转换成字符串 打印
        String body = new String(req, "UTF-8");
        LOGGER.info("receive data from server:" + body);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

        LOGGER.warning("exception occurred:" + cause.getMessage());
        ctx.close();
    }
}

代码运行

我们先后启动服务端,客户端,观察控制台返回。

可以看到客户端的控制台打印日志如下:

八月 13, 2019 11:21:35 上午 com.snowalker.client.EchoClientHandler channelRead0
信息: receive data from server:玩转Netty实战
八月 13, 2019 11:21:35 上午 com.snowalker.client.EchoClientHandler channelRead0
信息: receive data from server:玩转Netty实战玩转Netty实战玩转Netty实战
八月 13, 2019 11:21:35 上午 com.snowalker.client.EchoClientHandler channelRead0
信息: receive data from server:玩转Netty实战
八月 13, 2019 11:21:35 上午 com.snowalker.client.EchoClientHandler channelRead0
信息: receive data from server:玩转Netty实战
八月 13, 2019 11:21:35 上午 com.snowalker.client.EchoClientHandler channelRead0
信息: receive data from server:玩转Netty实战玩转Netty实战
八月 13, 2019 11:21:35 上午 com.snowalker.client.EchoClientHandler channelRead0
信息: receive data from server:玩转Netty实战

服务端也打印出对应的日志。

表明我们通过Netty开发的EchoServer运行符合预期,有心的同学可能已经发现,我们的日志中存在一行打印多个报文的情况。如:

信息: receive data from server:玩转Netty实战玩转Netty实战玩转Netty实战

这其实就是发生了粘包现象,也就是而这在实战中是不允许出现的。

除了粘包,还有拆包现象;

粘包,直观感受就是多个字符串“粘”在了一起,即一行传输了多个报文;

拆包,与粘包相反,一个字符串被“拆”开,形成一个破碎的包。

而这就是需要我们去解决的,关于如何解决粘包、拆包的问题,我们在接下来的文章中进行解析。

版权声明:

原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。

原文  http://wuwenliang.net/2019/08/13/从零学Netty之初探Netty/
正文到此结束
Loading...