转载

基于Mina的配置中心(三)

基于Mina的配置中心(三)

在第二章里我们已经自定义了包 MessagePack 。接下来我们要定义编码器和解码器。

  • 编码器: 把 java 对象转为二进制编码,因为在网络中传输的是二进制数据。
  • 解码器:把二进制数据转为 java 对象,也就是编码的逆向过程。

编码解码器工厂 MessageProtocolCodecFactory

首先我们要自定义一个编码器工厂,就像 TextLineCodecFactory 一样。由于这个写法是固定的,所以就不放代码了,具体可以到 GitHub 查看源代码。

编码器 MessageProtocolEncoder

这个东西的写法也是固定的。唯一不同的地方就是放置数据。

@Override
public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
    MessagePack pack = (MessagePack) message;
    //设置缓冲区大小,并自动增长
    IoBuffer ioBuffer = IoBuffer.allocate(pack.getLen()).setAutoExpand(true);
    log.info("MessageProtocolEncoder_encode_length:{}", pack.getLen());
    //放置长度
    ioBuffer.putInt(pack.getLen());
    //放置模块代码
    ioBuffer.putInt(pack.getModule());
    if (StringUtils.isNotBlank(pack.getBody())) {
        log.info("MessageProtocolEncoder_encode_length:{}", pack.getBody().getBytes().length);
        //放置字节数组
        ioBuffer.putString(pack.getBody(), charset.newEncoder());
    }
    ioBuffer.flip();
    out.write(ioBuffer);
}
复制代码

我们把从 MessagePack 中取出的数据,按照顺序放到 IoBuffer 中,然后使用 out.write(ioBuffer); 把消息写出去,其实是把二进制数据存储到了一个队列中 Queue<Object> messageQueue

其实是在这里把消息发出去的 org.apache.mina.filter.codec.ProtocolCodecFilter#filterWrite

基于Mina的配置中心(三)

解码器 MessageProtocolDecoder

其实解码器会麻烦一点,我们脑袋里要有这个包的模型,包的开头是length(总长度),然后是module(模块代码),最后是Json字符串(Message),在解码时,还要判断一下是否是完整的包。

像下面这样:

@Override
protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
    // 包头的长度
    // 拆包时,如果可读数据的长度小于包头的长度,就不进行读取
    if (in.remaining() < MessagePack.PACK_HEAD_LEN) {
        return false;
    } else {
        //标记当前position,以便后继的reset操作能恢复position位置
        in.mark();
        // 获取总长度
        int length = in.getInt();
        //获取模块代码
        int module = in.getInt();
        log.info("CustomProtocolDecoder_doDecode_length:{}, module:{}", length, module);
        // 如果可读取数据的长度 小于 总长度 - 包头的长度 ,则结束拆包,等待下一次
        if (in.remaining() < (length - MessagePack.PACK_HEAD_LEN)) {
            in.reset();
            return false;
        } else {
            //重置回复position位置到操作前  并读取一条完整记录
            in.reset();
            byte[] bytes = new byte[length];
            // 获取长度4个字节、模块4个字节、内容,即获取完整消息
            in.get(bytes, 0, length);
            String content = new String(bytes, MessagePack.PACK_HEAD_LEN, length - MessagePack.PACK_HEAD_LEN, charset);
            // 封装为自定义的java对象
            MessagePack pack = new MessagePack(module, content);
            out.write(pack);
            // 如果读取一条记录后,还存在数据(粘包),则再次进行调用
            return in.remaining() > 0;
        }
    }
}
复制代码

MessageProtocolCodecFactory 中,引入这两个类。编码解码器完成,现在需要把它配置到 Mina 的配置类中。

MinaServerConfig 中添加下面代码。

/**
 * 编解码器filter
 */
@Bean
public ProtocolCodecFilter protocolCodecFilter() {
    return new ProtocolCodecFilter(new MessageProtocolCodecFactory());
}
复制代码

心跳检测

心跳检测简单点说就是客户端每隔一段时间,向服务器发送一个消息,也就是心跳包,让服务器知道连接状态没有问题,客户端正常在线。如果没有发送,在一定时间内超过指定次数,服务器会认为客户端掉线了,为了节省资源,会关闭连接。

心跳检测一般有下面几种类型:

  1. 活跃型: 当心跳请求包被接受到后,立即发出心跳反馈。

  2. 半活跃型:发送心跳请求,不在乎有没有心跳反馈。但是接收到心跳请求后,也会立即发出心跳反馈。

  3. 聋子型:主动发送心跳请求,不想发送任何心跳反馈,但是接收到心跳请求后,也会立即发出心跳反馈。

  4. 持续监听型:既不想发送心跳请求也不想发送心跳反馈。

这里我们使用被动型,服务器接受客户端心跳请求,当在规定时间内没有收到时 将客户端连接关闭。

package com.lww.mina.filter;

import com.alibaba.fastjson.JSONObject;
import com.lww.mina.protocol.MessagePack;
import com.lww.mina.util.Const;
import lombok.extern.slf4j.Slf4j;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.keepalive.KeepAliveMessageFactory;

/**
 * 被动型心跳机制
 *
 * @author lww
 * @date 2020-07-06 22:40
 */
@Slf4j
public class ServerKeepAliveFactoryImpl implements KeepAliveMessageFactory {

    /**
     * 用来判断接收到的消息是不是一个心跳请求包,是就返回true[接收端使用]
     */
    @Override
    public boolean isRequest(IoSession session, Object message) {
        if (message instanceof MessagePack) {
            MessagePack pack = (MessagePack) message;
            if (pack.getModule() == Const.HEART_BEAT) {
                log.info("收到 心跳请求 ServerKeepAliveFactoryImpl_isRequest_pack:{}", JSONObject.toJSONString(message));
                return true;
            }
        }
        return false;
    }

    /**
     * 用来判断接收到的消息是不是一个心跳回复包,是就返回true[发送端使用]
     */
    @Override
    public boolean isResponse(IoSession session, Object message) {
        return false;
    }

    /**
     * 在需要发送心跳时,用来获取一个心跳请求包[发送端使用]
     */
    @Override
    public Object getRequest(IoSession session) {
        return null;
    }

    /**
     * 在需要回复心跳时,用来获取一个心跳回复包[接收端使用]
     */
    @Override
    public Object getResponse(IoSession session, Object message) {
        MessagePack pack = (MessagePack) message;
        // 将超时次数置为0
        session.setAttribute(Const.TIME_OUT_KEY, 0);
        log.info("响应 心跳请求 ServerKeepAliveFactoryImpl_getResponse_request:{}", JSONObject.toJSONString(message));
        return new MessagePack(Const.HEART_BEAT, "heart");
    }
}
复制代码

然后同样把心跳检测也配置到 MinaServerConfig 中。

/**
 * 心跳检测
 */
@Bean
public ServerKeepAliveFactoryImpl keepAliveFactoryImpl() {
    return new ServerKeepAliveFactoryImpl();
}

/**
 * 心跳filter
 */
@Bean
public KeepAliveFilter keepAliveFilter(ServerKeepAliveFactoryImpl keepAliveFactory) {
    // 注入心跳工厂,读写空闲
    KeepAliveFilter filter = new KeepAliveFilter(keepAliveFactory, IdleStatus.BOTH_IDLE);
    // 设置是否forward到下一个filter
    filter.setForwardEvent(true);
    // 设置心跳频率 5秒一次
    filter.setRequestInterval(Const.HEART_BEAT_RATE);
    return filter;
}
复制代码

最后将过滤器注入到 mina 的链式管理器中,还有开启 minaserver 服务,并设置对应的参数。

/**
 * 将过滤器注入到mina的链式管理器中
 */
@Bean
public DefaultIoFilterChainBuilder defaultIoFilterChainBuilder(ExecutorFilter executorFilter,
        LoggingFilter loggingFilter, ProtocolCodecFilter protocolCodecFilter, KeepAliveFilter keepAliveFilter) {
    DefaultIoFilterChainBuilder chainBuilder = new DefaultIoFilterChainBuilder();
    Map<String, IoFilter> filters = new LinkedHashMap<>();
    //多线程过滤器
    filters.put("executor", executorFilter);
    //日志
    filters.put("logger", loggingFilter);
    //编码 解码
    filters.put("codec", protocolCodecFilter);
    //心跳
    filters.put("keepAliveFilter", keepAliveFilter);
    chainBuilder.setFilters(filters);
    return chainBuilder;
}

/**
 * 开启mina的server服务,并设置对应的参数
 */
@Bean
public IoAcceptor ioAcceptor(DefaultIoFilterChainBuilder filterChainBuilder) throws IOException {
    IoAcceptor acceptor = new NioSocketAcceptor();
    //设置缓冲区大小
    acceptor.getSessionConfig().setReadBufferSize(config.getReadBufferSize());
    //设置空闲状态时间,10秒没操作就进入空闲状态
    acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, config.getIdelTimeOut());
    //过滤器链
    acceptor.setFilterChainBuilder(filterChainBuilder);
    //处理器 这个 handler 处理所有的连接事件
    acceptor.setHandler(new MinaServerHandler());
    //绑定地址
    acceptor.bind(new InetSocketAddress(config.getAddress(), config.getPort()));
    return acceptor;
}
复制代码

这里有一个问题:

//设置空闲状态时间,10秒没操作就进入空闲状态
acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, config.getIdelTimeOut());
复制代码

这个空闲时间是没有生效的,因为使用了心跳检测,空闲状态时间就是心跳检测的时间,所以也就是5秒。

配置类已经基本完成了,还有一个 MinaServerHandler ,这个 handler 就是处理客户端消息的处理器。

可以先创建出来,这样配置类不会报错。

package com.lww.mina.handler;

import org.apache.mina.core.service.IoHandlerAdapter;

/**
 * 处理客户端发送的消息
 *
 * @author lww
 * @date 2020-07-06 22:53
 */
public class MinaServerHandler extends IoHandlerAdapter {

}
复制代码

最后结构图:

基于Mina的配置中心(三)

总结

配置类基本完成了,我们这样配置了编码解码、心跳检测后,Mina会自动调用,是不是简单了很多?当然还剩下一个 handler ,已经粘了太多代码了,第四章再继续吧。

第四章会完成 handler ,还有 Session 管理,还有当配置更新时,推到客户端。完成了这些,基本上 Server 端就差不多完成了,然后会写 Client 端, Client 才是含金量更高的东西。敬请期待!

本次的代码没有全部粘出来,有兴趣的可以去 Github 查看。

项目源码

欢迎大家关注我的公众号,共同学习,一起进步。加油

基于Mina的配置中心(三)

本文使用 mdnice 排版

原文  https://juejin.im/post/5f04a91ee51d45347500a896
正文到此结束
Loading...