转载

使用 Netty 实现 IM 聊天贼简单,看不懂就锤爆哪吒的狗头~(上)

  • 1. 概述
  • 2. 构建 Netty 服务端与客户端
  • 3. 通信协议
  • 4. 消息分发
  • 5. 断开重连
  • 6. 心跳机制与空闲检测
  • 7. 认证逻辑
  • 8. 单聊逻辑
  • 9. 群聊逻辑
  • 666. 彩蛋
  • 本文在提供完整代码示例  扫一扫下面

    使用 Netty 实现 IM 聊天贼简单,看不懂就锤爆哪吒的狗头~(上)

    原创不易,给点个 哪吒 嘿,一起冲鸭!

    1. 概述

    Netty 是一个 Java 开源框架。

    Netty 提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

    也就是说,Netty 是一个基于 NIO 的客户、服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户,服务端应用。

    Netty 相当简化和流线化了网络应用的编程开发过程,例如,TCP 和 UDP 的 Socket 服务开发。

    下面,我们来新建三个项目,如下图所示:

    使用 Netty 实现 IM 聊天贼简单,看不懂就锤爆哪吒的狗头~(上)

    三个项目

    • lab-67-netty-demo-server 项目:搭建 Netty 服务端。
    • lab-67-netty-demo-client 项目:搭建 Netty 客户端。
    • lab-67-netty-demo-common 项目:提供 Netty 的基础封装,提供消息的编解码、分发的功能。

    另外,我们也会提供 Netty 常用功能的示例:

    • 心跳机制,实现服务端对客户端的存活检测。
    • 断线重连,实现客户端对服务端的重新连接。

    不哔哔,直接开干。

    友情提示:可能会胖友担心,没有 Netty 基础是不是无法阅读本文?!

    艿艿的想法,看!就硬看,按照代码先自己能搭建一下哈~文末,艿艿会提供一波 Netty 基础 入门的文章。

    2. 构建 Netty 服务端与客户端

    本小节,我们先来使用 Netty 构建服务端与客户端的核心代码,让胖友对项目的代码有个初始的认知。

    2.1 构建 Netty 服务端

    创建 lab-67-netty-demo-server 项目,搭建 Netty 服务端 。如下图所示:

    使用 Netty 实现 IM 聊天贼简单,看不懂就锤爆哪吒的狗头~(上)

    项目结构

    下面,我们只会暂时看看 server 包下的代码,避免信息量过大,击穿胖友的秃头。

    2.1.1 NettyServer

    创建 NettyServer 类,Netty 服务端。代码如下:

    @Componentpublic class NettyServer {    private Logger logger = LoggerFactory.getLogger(getClass());    @Value("${netty.port}")    private Integer port;    @Autowired    private NettyServerHandlerInitializer nettyServerHandlerInitializer;    /**     * boss 线程组,用于服务端接受客户端的连接     */    private EventLoopGroup bossGroup = new NioEventLoopGroup();    /**     * worker 线程组,用于服务端接受客户端的数据读写     */    private EventLoopGroup workerGroup = new NioEventLoopGroup();    /**     * Netty Server Channel     */    private Channel channel;    /**     * 启动 Netty Server     */    @PostConstruct    public void start() throws InterruptedException {        // <2.1> 创建 ServerBootstrap 对象,用于 Netty Server 启动        ServerBootstrap bootstrap = new ServerBootstrap();        // <2.2> 设置 ServerBootstrap 的各种属性        bootstrap.group(bossGroup, workerGroup) // <2.2.1> 设置两个 EventLoopGroup 对象                .channel(NioServerSocketChannel.class)  // <2.2.2> 指定 Channel 为服务端 NioServerSocketChannel                .localAddress(new InetSocketAddress(port)) // <2.2.3> 设置 Netty Server 的端口                .option(ChannelOption.SO_BACKLOG, 1024) // <2.2.4> 服务端 accept 队列的大小                .childOption(ChannelOption.SO_KEEPALIVE, true) // <2.2.5> TCP Keepalive 机制,实现 TCP 层级的心跳保活功能                .childOption(ChannelOption.TCP_NODELAY, true) // <2.2.6> 允许较小的数据包的发送,降低延迟                .childHandler(nettyServerHandlerInitializer);        // <2> 绑定端口,并同步等待成功,即启动服务端        ChannelFuture future = bootstrap.bind().sync();        if (future.isSuccess()) {            channel = future.channel();            logger.info("[start][Netty Server 启动在 {} 端口]", port);        }    }    /**     * 关闭 Netty Server     */    @PreDestroy    public void shutdown() {        // <3.1> 关闭 Netty Server        if (channel != null) {            channel.close();        }        // <3.2> 优雅关闭两个 EventLoopGroup 对象        bossGroup.shutdownGracefully();        workerGroup.shutdownGracefully();    }}复制代码

    ① 在类上,添加 @Component 注解,把 NettyServer 的创建交给 Spring 管理。

    • port 属性,读取 application.yml 配置文件的 netty.port 配置项。
    • #start() 方法,添加 @PostConstruct 注解,启动 Netty 服务器。
    • #shutdown() 方法,添加 @PreDestroy 注解,关闭 Netty 服务器。

    ② 我们来详细看看 #start() 方法的代码,如何实现 Netty Server 的启动。

    <2.1> 处,创建 ServerBootstrap 类,Netty 提供的 服务器 的启动类,方便我们初始化 Server。

    <2.2> 处,设置 ServerBootstrap 的各种属性。

    友情提示:这里涉及较多 Netty 组件的知识,艿艿先以简单的语言描述,后续胖友在文末的 Netty 基础入门的文章,补充学噢。

    <2.2.1> 处,调用 #group(EventLoopGroup parentGroup, EventLoopGroup childGroup) 方法,设置使用 bossGroup 和 workerGroup。其中:

    • bossGroup 属性: Boss 线程组,用于服务端接受客户端的 连接
    • workerGroup 属性: Worker 线程组,用于服务端接受客户端的 数据读写

    Netty 采用的是多 Reactor 多线程的模型,服务端可以接受 更多 客户端的数据读写的能力。原因是:

    创建专门用于接受 客户端连接 的 bossGroup 线程组,避免因为已连接的客户端的数据读写频繁,影响新的客户端的连接。

    创建专门用于接收 客户端读写 的 workerGroup 线程组, 多个 线程进行客户端的数据读写,可以支持更多客户端。

    课后习题:感兴趣的胖友,后续可以看看《【NIO 系列】——之 Reactor 模型》文章。

    <2.2.2> 处,调用 #channel(Class<? extends C> channelClass) 方法,设置使用 NioServerSocketChannel 类,它是 Netty 定义的 NIO 服务端 TCP Socket 实现类。

    <2.2.3> 处,调用 #localAddress(SocketAddress localAddress) 方法,设置服务端的 端口

    <2.2.4> 处,调用 option#(ChannelOption<T> option, T value) 方法,设置服务端接受客户端的 连接队列 大小。因为 TCP 建立连接是三次握手,所以第一次握手完成后,会添加到服务端的连接队列中。

    课后习题:更多相关内容,后续可以看看《浅谈 TCP Socket 的 backlog 参数》文章。

    <2.2.5> 处,调用 #childOption(ChannelOption<T> childOption, T value) 方法,TCP Keepalive 机制,实现 TCP 层级的 心跳保活 功能。

    课后习题:更多相关内容,后续可以看看《TCP Keepalive 机制刨根问底》文章。

    <2.2.6> 处,调用 #childOption(ChannelOption<T> childOption, T value) 方法,允许 较小的数据包 的发送,降低延迟。

    课后习题:更多相关内容,后续可以看看《详解 Socket 编程 --- TCP_NODELAY 选项》文章。

    <2.2.7> 处,调用 #childHandler(ChannelHandler childHandler) 方法,设置客户端连接上来的 Channel 的处理器为 NettyServerHandlerInitializer。稍后我们在「2.1.2 NettyServerHandlerInitializer」小节来看看。

    <2.3> 处,调用 #bind() + #sync() 方法,绑定端口,并 同步 等待成功,即启动服务端。

    ③ 我们来详细看看 #shutdown() 方法的代码,如何实现 Netty Server 的关闭。

    <3.1> 处,调用 Channel 的 #close() 方法,关闭 Netty Server,这样客户端就不再能连接了。

    <3.2> 处,调用 EventLoopGroup 的 #shutdownGracefully() 方法,优雅关闭 EventLoopGroup。例如说,它们里面的线程池。

    2.1.2 NettyServerHandlerInitializer

    在看 NettyServerHandlerInitializer 的代码之前,我们需要先了解下 Netty 的 ChannelHandler 组件,用来处理 Channel 的各种事件。这里的事件很广泛,比如可以是连接、数据读写、异常、数据转换等等。

    ChannelHandler 有非常多的子类,其中有个非常特殊的 ChannelInitializer,它用于 Channel 创建时,实现自定义的初始化逻辑。这里我们创建的 NettyServerHandlerInitializer 类,就继承了 ChannelInitializer 抽象类,代码如下:

    @Componentpublic class NettyServerHandlerInitializer extends ChannelInitializer<Channel> {    /**     * 心跳超时时间     */    private static final Integer READ_TIMEOUT_SECONDS = 3 * 60;    @Autowired    private MessageDispatcher messageDispatcher;    @Autowired    private NettyServerHandler nettyServerHandler;    @Override    protected void initChannel(Channel ch) {        // <1> 获得 Channel 对应的 ChannelPipeline        ChannelPipeline channelPipeline = ch.pipeline();        // <2> 添加一堆 NettyServerHandler 到 ChannelPipeline 中        channelPipeline                // 空闲检测                .addLast(new ReadTimeoutHandler(READ_TIMEOUT_SECONDS, TimeUnit.SECONDS))                // 编码器                .addLast(new InvocationEncoder())                // 解码器                .addLast(new InvocationDecoder())                // 消息分发器                .addLast(messageDispatcher)                // 服务端处理器                .addLast(nettyServerHandler)        ;    }}复制代码

    在每一个客户端与服务端建立完成连接时,服务端会创建一个 Channel 与之对应。此时,NettyServerHandlerInitializer 会进行执行 #initChannel(Channel c) 方法,进行自定义的初始化。

    友情提示:创建的客户端的 Channel,不要和「2.1.1 NettyServer」小节的 NioServerSocketChannel 混淆,不是同一个哈。

    在 #initChannel(Channel ch) 方法的 ch参数,就是此时创建的客户端 Channel。

    ① <1> 处,调用 Channel 的 #pipeline() 方法,获得客户端 Channel 对应的 ChannelPipeline。ChannelPipeline 由一系列的 ChannelHandler 组成,又或者说是 ChannelHandler 。这样, Channel 所有上所有的事件都会经过 ChannelPipeline,被其上的 ChannelHandler 所处理。

    ② <2> 处,添加 五个 ChannelHandler 到 ChannelPipeline 中,每一个的作用看其上的注释。具体的,我们会在后续的小节详细解释。

    2.1.3 NettyServerHandler

    创建 NettyServerHandler 类,继承 ChannelInboundHandlerAdapter 类,实现客户端 Channel 建立 连接、 断开 连接、异常时的处理。代码如下:

    @Component@ChannelHandler.Sharablepublic class NettyServerHandler extends ChannelInboundHandlerAdapter {    private Logger logger = LoggerFactory.getLogger(getClass());    @Autowired    private NettyChannelManager channelManager;    @Override    public void channelActive(ChannelHandlerContext ctx) {        // 从管理器中添加        channelManager.add(ctx.channel());    }    @Override    public void channelUnregistered(ChannelHandlerContext ctx) {        // 从管理器中移除        channelManager.remove(ctx.channel());    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {        logger.error("[exceptionCaught][连接({}) 发生异常]", ctx.channel().id(), cause);        // 断开连接        ctx.channel().close();    }}复制代码

    ① 在类上添加 @ChannelHandler.Sharable 注解,标记这个 ChannelHandler 可以被多个 Channel 使用。

    ② channelManager 属性,是我们实现的客户端 Channel 的管理器。

    • #channelActive(ChannelHandlerContext ctx) 方法,在客户端和服务端 建立 连接完成时,调用 NettyChannelManager 的 #add(Channel channel) 方法,添加到 其中
    • #channelUnregistered(ChannelHandlerContext ctx) 方法,在客户端和服务端 断开 连接时,调用 NettyChannelManager 的 #add(Channel channel) 方法,从其中 移除

    具体的 NettyChannelManager 的源码,我们在「2.1.4 NettyChannelManager」 小节中来瞅瞅~

    ③ #exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 方法,在处理 Channel 的事件发生异常时,调用 Channel 的 #close() 方法, 断开 和客户端的连接。

    2.1.4 NettyChannelManager

    创建 NettyChannelManager 类,提供 两种 功能。

    ① 客户端 Channel 的 管理 。代码如下:

    @Componentpublic class NettyChannelManager {    /**     * {@link Channel#attr(AttributeKey)} 属性中,表示 Channel 对应的用户     */    private static final AttributeKey<String> CHANNEL_ATTR_KEY_USER = AttributeKey.newInstance("user");    private Logger logger = LoggerFactory.getLogger(getClass());    /**     * Channel 映射     */    private ConcurrentMap<ChannelId, Channel> channels = new ConcurrentHashMap<>();    /**     * 用户与 Channel 的映射。     *     * 通过它,可以获取用户对应的 Channel。这样,我们可以向指定用户发送消息。     */    private ConcurrentMap<String, Channel> userChannels = new ConcurrentHashMap<>();    /**     * 添加 Channel 到 {@link #channels} 中     *     * @param channel Channel     */    public void add(Channel channel) {        channels.put(channel.id(), channel);        logger.info("[add][一个连接({})加入]", channel.id());    }    /**     * 添加指定用户到 {@link #userChannels} 中     *     * @param channel Channel     * @param user 用户     */    public void addUser(Channel channel, String user) {        Channel existChannel = channels.get(channel.id());        if (existChannel == null) {            logger.error("[addUser][连接({}) 不存在]", channel.id());            return;        }        // 设置属性        channel.attr(CHANNEL_ATTR_KEY_USER).set(user);        // 添加到 userChannels        userChannels.put(user, channel);    }    /**     * 将 Channel 从 {@link #channels} 和 {@link #userChannels} 中移除     *     * @param channel Channel     */    public void remove(Channel channel) {        // 移除 channels        channels.remove(channel.id());        // 移除 userChannels        if (channel.hasAttr(CHANNEL_ATTR_KEY_USER)) {            userChannels.remove(channel.attr(CHANNEL_ATTR_KEY_USER).get());        }        logger.info("[remove][一个连接({})离开]", channel.id());    }}复制代码

    ② 向客户端 Channel 发送 消息 。代码如下:

    @Componentpublic class NettyChannelManager {    /**     * 向指定用户发送消息     *     * @param user 用户     * @param invocation 消息体     */    public void send(String user, Invocation invocation) {        // 获得用户对应的 Channel        Channel channel = userChannels.get(user);        if (channel == null) {            logger.error("[send][连接不存在]");            return;        }        if (!channel.isActive()) {            logger.error("[send][连接({})未激活]", channel.id());            return;        }        // 发送消息        channel.writeAndFlush(invocation);    }    /**     * 向所有用户发送消息     *     * @param invocation 消息体     */    public void sendAll(Invocation invocation) {        for (Channel channel : channels.values()) {            if (!channel.isActive()) {                logger.error("[send][连接({})未激活]", channel.id());                return;            }            // 发送消息            channel.writeAndFlush(invocation);        }    }}复制代码

    2.1.5 引入依赖

    创建 pom.xml 文件,引入 Netty 依赖。

    <?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">    <parent>        <artifactId>lab-67-netty-demo</artifactId>        <groupId>cn.iocoder.springboot.labs</groupId>        <version>1.0-SNAPSHOT</version>    </parent>    <modelVersion>4.0.0</modelVersion>    <artifactId>lab-67-netty-demo-server</artifactId>    <properties>        <!-- 依赖相关配置 -->        <spring.boot.version>2.2.4.RELEASE</spring.boot.version>        <!-- 插件相关配置 -->        <maven.compiler.target>1.8</maven.compiler.target>        <maven.compiler.source>1.8</maven.compiler.source>    </properties>    <dependencyManagement>        <dependencies>            <dependency>                <groupId>org.springframework.boot</groupId>                <artifactId>spring-boot-starter-parent</artifactId>                <version>${spring.boot.version}</version>                <type>pom</type>                <scope>import</scope>            </dependency>        </dependencies>    </dependencyManagement>    <dependencies>        <!-- Spring Boot 基础依赖 -->        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter</artifactId>        </dependency>        <!-- Netty 依赖 -->        <dependency>            <groupId>io.netty</groupId>            <artifactId>netty-all</artifactId>            <version>4.1.50.Final</version>        </dependency>        <!-- 引入 netty-demo-common 封装 -->        <dependency>            <groupId>cn.iocoder.springboot.labs</groupId>            <artifactId>lab-67-netty-demo-common</artifactId>            <version>1.0-SNAPSHOT</version>        </dependency>    </dependencies></project>复制代码

    2.1.6 NettyServerApplication

    创建 NettyServerApplication 类,Netty Server 启动类。代码如下:

    @SpringBootApplicationpublic class NettyServerApplication {    public static void main(String[] args) {        SpringApplication.run(NettyServerApplication.class, args);    }}复制代码

    2.1.7 简单测试

    执行 NettyServerApplication 类,启动 Netty Server 服务器。日志如下:

    ... // 省略其他日志2020-06-21 00:16:38.801  INFO 41948 --- [           main] c.i.s.l.n.server.NettyServer             : [start][Netty Server 启动在 8888 端口]2020-06-21 00:16:38.893  INFO 41948 --- [           main] c.i.s.l.n.NettyServerApplication         : Started NettyServerApplication in 0.96 seconds (JVM running for 1.4)复制代码

    Netty Server 启动在 8888 端口。

    2.2 构建 Netty 客户端

    创建 lab-67-netty-demo-client 项目,搭建 Netty 客户端 。如下图所示:

    使用 Netty 实现 IM 聊天贼简单,看不懂就锤爆哪吒的狗头~(上)

    项目结构

    下面,我们只会暂时看看 client 包下的代码,避免信息量过大,击穿胖友的秃头。

    2.2.1 NettyClient

    创建 NettyClient 类,Netty 客户端。代码如下:

    @Componentpublic class NettyClient {    /**     * 重连频率,单位:秒     */    private static final Integer RECONNECT_SECONDS = 20;    private Logger logger = LoggerFactory.getLogger(getClass());    @Value("${netty.server.host}")    private String serverHost;    @Value("${netty.server.port}")    private Integer serverPort;    @Autowired    private NettyClientHandlerInitializer nettyClientHandlerInitializer;    /**     * 线程组,用于客户端对服务端的连接、数据读写     */    private EventLoopGroup eventGroup = new NioEventLoopGroup();    /**     * Netty Client Channel     */    private volatile Channel channel;    /**     * 启动 Netty Server     */    @PostConstruct    public void start() throws InterruptedException {        // <2.1> 创建 Bootstrap 对象,用于 Netty Client 启动        Bootstrap bootstrap = new Bootstrap();        // <2.2>        bootstrap.group(eventGroup) // <2.2.1> 设置一个 EventLoopGroup 对象                .channel(NioSocketChannel.class)  // <2.2.2> 指定 Channel 为客户端 NioSocketChannel                .remoteAddress(serverHost, serverPort) // <2.2.3> 指定连接服务器的地址                .option(ChannelOption.SO_KEEPALIVE, true) // <2.2.4> TCP Keepalive 机制,实现 TCP 层级的心跳保活功能                .option(ChannelOption.TCP_NODELAY, true) //<2.2.5>  允许较小的数据包的发送,降低延迟                .handler(nettyClientHandlerInitializer);        // <2.3> 连接服务器,并异步等待成功,即启动客户端        bootstrap.connect().addListener(new ChannelFutureListener() {            @Override            public void operationComplete(ChannelFuture future) throws Exception {                // 连接失败                if (!future.isSuccess()) {                    logger.error("[start][Netty Client 连接服务器({}:{}) 失败]", serverHost, serverPort);                    reconnect();                    return;                }                // 连接成功                channel = future.channel();                logger.info("[start][Netty Client 连接服务器({}:{}) 成功]", serverHost, serverPort);            }        });    }    public void reconnect() {        // ... 暂时省略代码。    }    /**     * 关闭 Netty Server     */    @PreDestroy    public void shutdown() {        // <3.1> 关闭 Netty Client        if (channel != null) {            channel.close();        }        // <3.2> 优雅关闭一个 EventLoopGroup 对象        eventGroup.shutdownGracefully();    }    /**     * 发送消息     *     * @param invocation 消息体     */    public void send(Invocation invocation) {        if (channel == null) {            logger.error("[send][连接不存在]");            return;        }        if (!channel.isActive()) {            logger.error("[send][连接({})未激活]", channel.id());            return;        }        // 发送消息        channel.writeAndFlush(invocation);    }}复制代码

    友情提示:整体代码,是和「2.1.1 NettyServer」对等,且基本是一致的。

    ① 在类上,添加 @Component 注解,把 NettyClient 的创建交给 Spring 管理。

    • serverHost 和 serverPort 属性,读取 application.yml 配置文件的 netty.server.host 和 netty.server.port 配置项。
    • #start() 方法,添加 @PostConstruct 注解,启动 Netty 客户端。
    • #shutdown() 方法,添加 @PreDestroy 注解,关闭 Netty 客户端。

    ② 我们来详细看看 #start() 方法的代码,如何实现 Netty Client 的启动,建立和服务器的连接。

    <2.1> 处,创建 Bootstrap 类,Netty 提供的 客户端 的启动类,方便我们初始化 Client。

    <2.2> 处,设置 Bootstrap 的各种属性。

    <2.2.1> 处,调用 #group(EventLoopGroup group) 方法,设置使用 eventGroup 线程组,实现客户端对服务端的连接、数据读写。

    <2.2.2> 处,调用 #channel(Class<? extends C> channelClass) 方法,设置使用 NioSocketChannel 类,它是 Netty 定义的 NIO 服务端 TCP Client 实现类。

    <2.2.3> 处,调用 #remoteAddress(SocketAddress localAddress) 方法,设置连接服务端的 地址

    <2.2.4> 处,调用 #option(ChannelOption<T> childOption, T value) 方法,TCP Keepalive 机制,实现 TCP 层级的 心跳保活 功能。

    <2.2.5> 处,调用 #childOption(ChannelOption<T> childOption, T value) 方法,允许 较小的数据包 的发送,降低延迟。

    <2.2.7> 处,调用 #handler(ChannelHandler childHandler) 方法,设置 自己 Channel 的处理器为 NettyClientHandlerInitializer。稍后我们在「2.2.2 NettyClientHandlerInitializer」小节来看看。

    <2.3> 处,调用 #connect() 方法,连接服务器,并 异步 等待成功,即启动客户端。同时,添加回调监听器 ChannelFutureListener,在连接服务端失败的时候,调用 #reconnect() 方法,实现定时重连。 具体 #reconnect() 方法的代码,我们稍后在瞅瞅哈。

    ③ 我们来详细看看 #shutdown() 方法的代码,如何实现 Netty Client 的关闭。

    <3.1> 处,调用 Channel 的 #close() 方法,关闭 Netty Client,这样客户端就断开和服务端的连接。

    <3.2> 处,调用 EventLoopGroup 的 #shutdownGracefully() 方法,优雅关闭 EventLoopGroup。例如说,它们里面的线程池。

    ④ #send(Invocation invocation) 方法,实现向服务端发送消息。

    因为 NettyClient 是客户端,所以无需像 NettyServer 一样使用「2.1.4 NettyChannelManager」维护 Channel 的集合。

    2.2.2 NettyClientHandlerInitializer

    创建的 NettyClientHandlerInitializer 类,就继承了 ChannelInitializer 抽象类,实现和服务端建立连接后,添加相应的 ChannelHandler 处理器。代码如下:

    @Componentpublic class NettyClientHandlerInitializer extends ChannelInitializer<Channel> {    /**     * 心跳超时时间     */    private static final Integer READ_TIMEOUT_SECONDS = 60;    @Autowired    private MessageDispatcher messageDispatcher;    @Autowired    private NettyClientHandler nettyClientHandler;    @Override    protected void initChannel(Channel ch) {        ch.pipeline()                // 空闲检测                .addLast(new IdleStateHandler(READ_TIMEOUT_SECONDS, 0, 0))                .addLast(new ReadTimeoutHandler(3 * READ_TIMEOUT_SECONDS))                // 编码器                .addLast(new InvocationEncoder())                // 解码器                .addLast(new InvocationDecoder())                // 消息分发器                .addLast(messageDispatcher)                // 客户端处理器                .addLast(nettyClientHandler)        ;    }}复制代码

    和「2.1.2 NettyServerHandlerInitializer」的代码基本一样,差别在于空闲检测额外增加 IdleStateHandler ,客户端处理器换成了 NettyClientHandler

    2.2.3 NettyClientHandler

    创建 NettyClientHandler 类,实现客户端 Channel 断开 连接、异常时的处理。代码如下:

    @Component@ChannelHandler.Sharablepublic class NettyClientHandler extends ChannelInboundHandlerAdapter {    private Logger logger = LoggerFactory.getLogger(getClass());    @Autowired    private NettyClient nettyClient;    @Override    public void channelInactive(ChannelHandlerContext ctx) throws Exception {        // 发起重连        nettyClient.reconnect();        // 继续触发事件        super.channelInactive(ctx);    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {        logger.error("[exceptionCaught][连接({}) 发生异常]", ctx.channel().id(), cause);        // 断开连接        ctx.channel().close();    }    @Override    public void userEventTriggered(ChannelHandlerContext ctx, Object event) throws Exception {        // 空闲时,向服务端发起一次心跳        if (event instanceof IdleStateEvent) {            logger.info("[userEventTriggered][发起一次心跳]");            HeartbeatRequest heartbeatRequest = new HeartbeatRequest();            ctx.writeAndFlush(new Invocation(HeartbeatRequest.TYPE, heartbeatRequest))                    .addListener(ChannelFutureListener.CLOSE_ON_FAILURE);        } else {            super.userEventTriggered(ctx, event);        }    }}复制代码

    ① 在类上添加 @ChannelHandler.Sharable 注解,标记这个 ChannelHandler 可以被多个 Channel 使用。

    ② #channelInactive(ChannelHandlerContext ctx) 方法,实现在和服务端 断开 连接时,调用 NettyClient 的 #reconnect() 方法,实现客户端 定时 和服务端 重连

    ③ #exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 方法,在处理 Channel 的事件发生异常时,调用 Channel 的 #close() 方法, 断开 和客户端的连接。

    ④ #userEventTriggered(ChannelHandlerContext ctx, Object event) 方法,在客户端在空闲时,向服务端发送一次心跳,即 心跳机制 。这块的内容,我们稍后详细讲讲。

    2.2.4 引入依赖

    创建 pom.xml 文件,引入 Netty 依赖。

    <?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">    <parent>        <artifactId>lab-67-netty-demo</artifactId>        <groupId>cn.iocoder.springboot.labs</groupId>        <version>1.0-SNAPSHOT</version>    </parent>    <modelVersion>4.0.0</modelVersion>    <artifactId>lab-67-netty-demo-client</artifactId>    <properties>        <!-- 依赖相关配置 -->        <spring.boot.version>2.2.4.RELEASE</spring.boot.version>        <!-- 插件相关配置 -->        <maven.compiler.target>1.8</maven.compiler.target>        <maven.compiler.source>1.8</maven.compiler.source>    </properties>    <dependencyManagement>        <dependencies>            <dependency>                <groupId>org.springframework.boot</groupId>                <artifactId>spring-boot-starter-parent</artifactId>                <version>${spring.boot.version}</version>                <type>pom</type>                <scope>import</scope>            </dependency>        </dependencies>    </dependencyManagement>    <dependencies>        <!-- 实现对 Spring MVC 的自动化配置 -->        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-web</artifactId>        </dependency>        <!-- Netty 依赖 -->        <dependency>            <groupId>io.netty</groupId>            <artifactId>netty-all</artifactId>            <version>4.1.50.Final</version>        </dependency>        <!-- 引入 netty-demo-common 封装 -->        <dependency>            <groupId>cn.iocoder.springboot.labs</groupId>            <artifactId>lab-67-netty-demo-common</artifactId>            <version>1.0-SNAPSHOT</version>        </dependency>    </dependencies></project>复制代码

    2.2.5 NettyClientApplication

    创建 NettyClientApplication 类,Netty Client 启动类。代码如下:

    @SpringBootApplicationpublic class NettyClientApplication {    public static void main(String[] args) {        SpringApplication.run(NettyClientApplication.class, args);    }}复制代码

    2.2.6 简单测试

    执行 NettyClientApplication 类,启动 Netty Client 客户端。日志如下:

    ... // 省略其他日志2020-06-21 09:06:12.205  INFO 44029 --- [ntLoopGroup-2-1] c.i.s.l.n.client.NettyClient             : [start][Netty Client 连接服务器(127.0.0.1:8888) 成功]复制代码

    同时 Netty Server 服务端发现有一个客户端接入,打印如下日志:

    2020-06-21 09:06:12.268  INFO 41948 --- [ntLoopGroup-3-1] c.i.s.l.n.server.NettyChannelManager     : [add][一个连接(db652822)加入]复制代码

    2.3 小结

    至此,我们已经构建 Netty 服务端和客户端完成。因为 Netty 提供的 API 非常便利,所以我们不会像直接使用 NIO 时,需要处理大量底层且细节的代码。

    不过,如上的内容仅仅是本文的开胃菜,正片即将开始!美滋滋,继续往下看,奥利给!

    原文  https://juejin.im/post/5f080f29f265da22f1472a8f
    正文到此结束
    Loading...