继续编写 Server
端代码,接下来是 handler
在 IoHandlerAdapter
中有以下方法。
看名字就可以看出,有处理异常、接收消息,发送消息、连接打开,连接关闭、进入空闲状态等方法。
我们可以根据自己的实际情况复写父类中的方法。在 MinaServerHandler
中,我们复写了以下方法:
最好复写 exceptionCaught
这个方法,不然出现异常,连接关闭的时候,你可能无从下手。
作为服务器,在 messageReceived
中处理客户端发出的请求。当客户端请求一次后,会把客户端连接信息保存下来,用来推送数据。
可以调用 messageSent
这个方法,向客户端发送信息。
在 sessionIdle
这个方法中,处理连接超时的情况,比如在一定时间内没有发送心跳包,关闭客户端连接。
package com.lww.mina.handler;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.lww.mina.dao.MessageDao;
import com.lww.mina.domain.Message;
import com.lww.mina.protocol.MessagePack;
import com.lww.mina.session.SessionManager;
import com.lww.mina.util.Const;
import com.lww.mina.util.SpringBeanFactoryUtils;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
/**
* 处理客户端发送的消息
*
* @author lww
* @date 2020-07-06 22:53
*/
@Slf4j
public class MinaServerHandler extends IoHandlerAdapter {
@Override
public void sessionCreated(IoSession session) {
InetSocketAddress isa = (InetSocketAddress) session.getRemoteAddress();
String ip = isa.getAddress().getHostAddress();
session.setAttribute("ip", ip);
log.info("来自" + ip + " 的终端上线,sessionId:" + session.getId());
}
@Override
public void sessionClosed(IoSession session) {
log.info(session.getAttribute(Const.SESSION_KEY) + " nid: " + session.getId() + "sessionClosed ");
// 移除 属性
session.removeAttribute(Const.SESSION_KEY);
// 移除超时属性
session.removeAttribute(Const.TIME_OUT_KEY);
String key = (String) session.getAttribute(Const.SESSION_KEY);
if (key != null) {
SessionManager.removeSession(key);
}
session.closeNow();
}
@Override
public void sessionIdle(IoSession session, IdleStatus status) {
if (session.getAttribute(Const.TIME_OUT_KEY) == null) {
session.closeNow();
log.error(session.getAttribute(Const.SESSION_KEY) + " nid: " + session.getId() + " time_out_key null");
return;
}
try {
int isTimeoutNum = (int) session.getAttribute(Const.TIME_OUT_KEY);
isTimeoutNum++;
// 没有超过最大次数,超时次数加1
if (isTimeoutNum < Const.TIME_OUT_NUM) {
session.setAttribute(Const.TIME_OUT_KEY, isTimeoutNum);
} else {
// 超过最大次数,关闭会话连接
String key = (String) session.getAttribute(Const.SESSION_KEY);
// 移除device属性
session.removeAttribute(Const.SESSION_KEY);
// 移除超时属性
session.removeAttribute(Const.TIME_OUT_KEY);
SessionManager.removeSession(key);
session.closeOnFlush();
log.info("client user: " + key + " more than " + Const.TIME_OUT_NUM + " times have no response, connection closed!");
}
} catch (Exception e) {
log.error(session.getAttribute(Const.SESSION_KEY) + " nid: " + session.getId() + e.getMessage());
session.closeNow();
}
}
@Override
public void exceptionCaught(IoSession session, Throwable cause) {
log.error("终端用户:{} 连接发生异常,即将关闭连接,原因:{}", session.getAttribute(Const.SESSION_KEY), cause);
}
@Override
public void messageReceived(IoSession session, Object message) {
SocketAddress remoteAddress = session.getRemoteAddress();
log.info("server received MinaServerHandler_messageReceived_remoteAddress:{}", remoteAddress);
MessagePack pack = (MessagePack) message;
MessagePack response;
String body = pack.getBody();
if (StringUtils.isBlank(body)) {
log.error("ServerHandler_messageReceived_body:{}", body);
response = new MessagePack(Const.BASE, "body empty");
session.write(response);
session.close(false);
return;
}
Message msg = JSONObject.parseObject(body, Message.class);
if (msg == null) {
log.error("ServerHandler_messageReceived_body:{}", body);
response = new MessagePack(Const.BASE, "message empty");
session.write(response);
session.close(false);
return;
}
if (Const.CONF.equalsIgnoreCase(msg.getPropertyValue()) && pack.getModule() == Const.BASE) {
log.info("ServerHandler_messageReceived_Susccess:{}", msg.getPropertyValue());
response = new MessagePack(pack.getModule(), body);
session.write(response);
return;
}
final String key = remoteAddress.toString();
//存储的key
session.setAttribute(Const.SESSION_KEY, key);
// 超时次数设为0
session.setAttribute(Const.TIME_OUT_KEY, 0);
synchronized (this) {
IoSession oldSession = SessionManager.getSession(key);
if (oldSession != null && !oldSession.equals(session)) {
// 移除key属性
oldSession.removeAttribute(Const.SESSION_KEY);
// 移除超时时间
oldSession.removeAttribute(Const.TIME_OUT_KEY);
// 替换oldSession
SessionManager.replaceSession(key, session);
session.closeOnFlush();
log.info("oldSession close!");
}
if (oldSession == null) {
SessionManager.addSession(key, session);
}
log.info("bind success: " + session.getRemoteAddress());
}
MessageDao minaMessageDao = SpringBeanFactoryUtils.getApplicationContext().getBean(MessageDao.class);
log.info("ServerHandler_messageReceived_projectName:{}, propertityValue:{}, envValue:{}", msg.getProjectName(), msg.getPropertyValue(), msg.getEnvValue());
Message configMessage = minaMessageDao.selectOne(new QueryWrapper<Message>().lambda()
.eq(Message::getProjectName, msg.getProjectName())
.eq(Message::getPropertyValue, msg.getPropertyValue())
.eq(Message::getEnvValue, msg.getEnvValue()));
if (configMessage == null && !msg.getPropertyValue().equalsIgnoreCase(Const.CONF)) {
log.error(session.toString() + "select null");
response = new MessagePack(Const.BASE, "select error");
session.write(response);
session.closeOnFlush();
} else {
// 设置session key
if (configMessage != null) {
configMessage.setRemoteAddress(key);
// AR模式
boolean updateSessionKey = configMessage.updateById();
log.info("ServerHandler_messageReceived_updateSessionKey:{}", updateSessionKey);
}
log.info(session.toString() + " succeed!");
response = new MessagePack(pack.getModule(), JSONObject.toJSONString(configMessage));
session.write(response);
}
}
@Override
public void messageSent(IoSession session, Object message) {
if (message instanceof Message) {
Message minaMessage = (Message) message;
session.write(new MessagePack(Const.CONFIG_MANAGE, JSONObject.toJSONString(minaMessage)));
}
session.setAttribute(Const.TIME_OUT_KEY, 0);
log.info("发送消息成功");
}
}
复制代码
上面的代码里出现了这个类 SessionManager
,这是一个管理 Session
的工具类,我尝试过把 Session
转为Json存储,可是会报异常,无法转为Json,序列化也不可行。
但是看文档,又是可以持久化的。
org.apache.mina.core.service.AbstractIoService#initSession
org.apache.mina.core.session.IoSessionDataStructureFactory
看代码也是定义了一个Map: org.apache.mina.core.session.IoSessionAttributeMap
,
使用这个类管理Session。
不过因为我们是配置中心,所以可以使用Map来存储到内存中,因为客户端数量不会很多。我用了一个线程安全的 ConcurrentHashMap
来存储 Session
对象,key就是客户端的连接信息。
这个样子: /127.0.0.1:55618
。
package com.lww.mina.session;
import com.lww.mina.util.Const;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.mina.core.session.IoSession;
/**
* @author lww
* @date 2020-07-06 23:21
*/
public class SessionManager {
/**
* 存放session的线程安全的map集合
*/
private static ConcurrentHashMap<String, IoSession> sessions = new ConcurrentHashMap<>();
/**
* 线程安全的自增类,用于统计连接数
*/
private static final AtomicInteger CONNECTIONS_COUNTER = new AtomicInteger(0);
/**
* 添加session
*/
public static void addSession(String account, IoSession session) {
sessions.put(account, session);
CONNECTIONS_COUNTER.incrementAndGet();
}
/**
* 获取session
*/
public static IoSession getSession(String key) {
return sessions.get(key);
}
/**
* 替换session,通过key
*/
public static void replaceSession(String key, IoSession session) {
sessions.put(key, session);
}
/**
* 移除session通过key
*/
public static void removeSession(String key) {
sessions.remove(key);
CONNECTIONS_COUNTER.decrementAndGet();
}
/**
* 移除session通过session
*/
public static void removeSession(IoSession session) {
String key = (String) session.getAttribute(Const.SESSION_KEY);
removeSession(key);
}
public static ConcurrentHashMap<String, IoSession> getSessions() {
return sessions;
}
}
复制代码
客户端每次连接服务器,都会在 message
表中更新连接信息,当连接不断,IP和端口是不会改变的,服务器也可以拿着这个 Session
和客户端通信,而且客户端断开重连,客户端的端口每次都可能是不一样的,存在 Map
中也可以方便的管理。
可能有些人还记得我之前写过的 SpringBoot事件发布与订阅 ,在框架中,这个确实很常用,SpringBoot的源码中到处用到了事件的发布与订阅。
先说一下配置更新推送的原理,我是在更新的时候发布了一个事件
SpringBeanFactoryUtils.getApplicationContext().publishEvent(new MessageEvent(afterMessage));
事件类
package com.lww.mina.event;
import com.lww.mina.domain.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationEvent;
/**
* 消息更新事件
*
* @author lww
* @date 2020-07-07 00:28
*/
@Slf4j
public class MessageEvent extends ApplicationEvent {
private Message message;
public MessageEvent(Message message) {
super(message);
log.info("发布消息 MessageEvent_MessageEvent_message:{}", message);
this.message = message;
}
public Message getMessage() {
return message;
}
}
复制代码
然后用 MessageChangeListener
监听这个事件,从 message
中取出客户端连接信息,然后作为 key
从 map
中取到对应的 Session
,通过 Session
发送消息给客户端。
MessageChangeListener
package com.lww.mina.listener;
import com.lww.mina.domain.Message;
import com.lww.mina.event.MessageEvent;
import com.lww.mina.handler.MinaServerHandler;
import com.lww.mina.session.SessionManager;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.mina.core.session.IoSession;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
/**
* @author lww
* @date 2020-07-07 00:30
*/
@Slf4j
@Component
public class MessageChangeListener {
@EventListener
public void onApplicationEvent(MessageEvent event) {
log.info("接收事件 MessageChangeListener_onApplicationEvent_event:{}", event);
//推送配置
Message message = event.getMessage();
Assert.isTrue(StringUtils.isNotBlank(message.getRemoteAddress()), "初始配置无法发送配置信息,需要客户端连接一次后,获取客户端地址端口等信息!");
try {
IoSession session = SessionManager.getSession(message.getRemoteAddress());
if (session != null) {
MinaServerHandler handler = new MinaServerHandler();
handler.messageSent(session, message);
}
} catch (Exception e) {
log.info("MessageChangeListener_onApplicationEvent_e:{}", e);
}
}
}
复制代码
到这里, Server
端已经完成的差不多了,至于 Controller
和 Service
里的业务代码,就不粘了,都是些普通的CRUD,有一些地方用到了 Mybatis-Plus
的AR模式,确实很好用。
不过 Controller
和 Service
的代码我都会提交到 GitHub
的,感兴趣的可以去 GitHub
看一下。
Server
端完成了,接下来就是 Client
了。先说一下,在 Client
里,如果是和 Server
重复的或者类似的,我会简单说一下或者一笔带过。毕竟 Client
里要讲的东西太多了。很多黑科技哦,敬请期待!
还有一点,我们要把项目打包,发布到 maven
仓库,在 client
中引入这个 mina-base
模块,当然可以申请发布到 maven
中央仓库,不过为了节省时间,我自己搭建了一个私人 maven
仓库,在 pom.xml
中配置仓库,就可以引入我的 mina-base
包了。当然也可以把项目打成 Jar
包,然后作为第三方包引入。
这个服务器是我买的最低配版的服务器,大家就只在这里用一下好了:joy:
<repositories>
<repository>
<id>public</id>
<name>Team Maven Repository</name>
<url>http://148.70.249.148:8081/nexus/content/groups/public/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
复制代码
为了方便调试,还要把代码一起提交到仓库。在 pom.xml
中添加下面的配置
<plugin>
<artifactId>maven-source-plugin</artifactId>
<version>2.1</version>
<configuration>
<attach>true</attach>
</configuration>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
复制代码
使用这个命令就可以把代码发布到仓库了。
mvn clean deploy -Dmaven.test.skip=true -Dmaven.javadoc.skip=true
Server端现在还有一些问题,会在第五章解决。
项目源码
欢迎大家关注我的公众号,共同学习,一起进步。加油
本文使用 mdnice 排版