title: Mina服务端客户端通信
date: 2018-09-30 09:00:30
tags:
- [mina] - [tcp]
categories:
- [编程]
[TOC]
前两章节已经完整的介绍了理论部分,今天我们就利用这些理论来实现tcp协议的c/s 通信。首先我们简单回顾下之前的介绍,
在mina中我们的客户端和服务端简直就是一模一样,只是我们用不同适配器。但是他的数据处理流程是一样的。今天我们就重点看看如何建立服务端、客户端
并且处理两者之间的消息通信处理
服务端和客户端不同的就是我们创建的监听对象不同而已,客户端发送消息到服务端,服务端需要经历过滤器的处理才能到达消息中心,但是在过滤器中我们就需要将消息进行解码,然后才会到消息接收的地方处理我们的业务。正常情况下我们处理完消息需要对客户端进行回应。回应的时候也会经历过滤器中的编码逻辑,进行数据编码然后发送。信息发送到客户端我们可以看成服务端的方向。也是需要进行编解码的。下面看看服务端的创建代码
//创建监听对象 IoAcceptor acceptor = new NioSocketAcceptor(); TextLineCodecFactory textLineCodecFactory = new TextLineCodecFactory(Charset.forName("utf-8"), LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue()); //添加过滤器 acceptor.getFilterChain().addLast("logger",new LoggingFilter()); acceptor.getFilterChain().addLast("protocal",new ProtocolCodecFilter( textLineCodecFactory )); //设置时间处理的handler acceptor.setHandler(new ServerMessageHandler()); //设置读取数据缓存区的大小 acceptor.getSessionConfig().setReadBufferSize(Constaint.READSIZE); //设置多久没有消息就进入空闲状态 acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE,Constaint.IDLETIME); //绑定端口 try { acceptor.bind(new InetSocketAddress(Constaint.REMOTE_PORT)); } catch (IOException e) { logger.error(String.format("bind %s error",Constaint.REMOTE_PORT)); e.printStackTrace(); } logger.info(String.format("bind %s success",Constaint.REMOTE_PORT));
//创建监听对象 IoConnector connector = new NioSocketConnector(); TextLineCodecFactory textLineCodecFactory = new TextLineCodecFactory(Charset.forName("utf-8"), LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue()); //添加过滤器 //日志过滤器 。 sltf日志设置 connector.getFilterChain().addLast("logger",new LoggingFilter()); //在这个过滤器中提供了编解码,这里的编码是以信息中已/r/n结尾算是一条信息 connector.getFilterChain().addLast("protocal",new ProtocolCodecFilter( new SocketFactory() )); //设置时间处理的handler , 提供session生命周期的监听函数,消息接受,发送的函数 connector.setHandler(new ClientMessageHandler()); //设置读取数据缓存区的大小 connector.getSessionConfig().setReadBufferSize(Constaint.READSIZE); //设置多久没有消息就进入空闲状态 connector.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE,Constaint.IDLETIME); ConnectFuture future = connector.connect(new InetSocketAddress(Constaint.REMOTE_IP,Constaint.REMOTE_PORT)); //是异步处理,这里不会造成阻塞 future.addListener(new IoFutureListener<IoFuture>() { @Override public void operationComplete(IoFuture ioFuture) { logger.info("连接准备完成"); IoSession session = ioFuture.getSession(); } });
private static Logger logger = LogManager.getLogger(ServerMessageHandler.class); public void sessionCreated(IoSession session) throws Exception { super.sessionCreated(session); logger.info("sessionCreated"); } public void sessionOpened(IoSession session) throws Exception { super.sessionOpened(session); try { IoBuffer buffer = IoBuffer.allocate(30); buffer.clear(); buffer.putString("quit/r/n", Charset.forName("utf-8").newEncoder()); buffer.flip(); session.write(buffer); } catch (Exception e) { logger.error(e.toString()); } logger.info("sessionOpened"); } public void sessionClosed(IoSession session) throws Exception { super.sessionClosed(session); logger.info("sessionClosed"); } public void sessionIdle(IoSession session, IdleStatus idleStatus) throws Exception { super.sessionIdle(session,idleStatus); try { IoBuffer buffer = IoBuffer.allocate(30); buffer.clear(); buffer.putString("quit/r/n", Charset.forName("utf-8").newEncoder()); buffer.flip(); session.write(buffer); } catch (Exception e) { logger.error(e.toString()); } // logger.info("sessionIdle"); } public void exceptionCaught(IoSession ioSession, Throwable throwable) throws Exception { logger.info("exceptionCaught"); throwable.printStackTrace(); } public void messageReceived(IoSession session, Object message) throws Exception { super.messageReceived(session, message); String info = message.toString(); Date date = new Date(System.currentTimeMillis()); SimpleDateFormat sdf = new SimpleDateFormat("yy-MM-dd HH:mm:ss"); String time = sdf.format(date); session.write(time); System.out.println("接收到的消息:"+info); } public void messageSent(IoSession session, Object message) throws Exception { super.messageSent(session, message); logger.info("messageSent"); }
public class SocketFactory implements ProtocolCodecFactory { private MessageDecoder decoder; private MessageEncoder encoder; public SocketFactory() { decoder = new MessageDecoder(); encoder = new MessageEncoder(); } public ProtocolDecoder getDecoder(IoSession session) throws Exception { return this.decoder; } public ProtocolEncoder getEncoder(IoSession session) throws Exception { return this.encoder; } }
解码器写好之后只需要在上面自定义工厂中创建就好了。至于自定义编码器只需要继承CumulativeProtocolDecoder这个类就好了。而且复写doDecode方法就好了。这个方法的返回值是boolean类型。返回值不同代表意义不一。这里需要重点理清楚
3、数据比一条完整信息(粘包)多,那么我们处理到一条信息后也需要返回true,但是CumulativeProtocolDecoder会将剩余的缓存继续拼装,剩余消息就相当于内部进行了第二次解码。如果不过那么相当于上面第一种情况
记住三种情况 半包 、 正常 、 粘包
public class MessageDecoder extends CumulativeProtocolDecoder { /** * 此方法return true : 表示父类中CumulativeProtocolDecoder会不断的调用此方法进行消息的消费 * return false: 表示消息已经消费完全了,缓存中就算有数据也不会再消费了。等待再次客户端 * 发送消息时会触发消息发送接口,此时会将新旧消息拼接再一起进行处理 * @param ioSession * @param ioBuffer * @param protocolDecoderOutput * @return * @throws Exception */ @Override protected boolean doDecode(IoSession ioSession, IoBuffer ioBuffer, ProtocolDecoderOutput protocolDecoderOutput) throws Exception { IoBuffer buffer = IoBuffer.allocate(10); while (ioBuffer.hasRemaining()) { if (ioBuffer.remaining()<3) { //继续接受 return false; } //获取三个字节 int oldLimit = ioBuffer.limit(); ioBuffer.limit(ioBuffer.position()+3); String text = ioBuffer.getString(Charset.forName("UTF-8").newDecoder()); protocolDecoderOutput.write(text); ioBuffer.limit(oldLimit); if (ioBuffer.hasRemaining()) { return true; } } return false; } }
public class MessageEncoder extends ProtocolEncoderAdapter { @Override public void encode(IoSession ioSession, Object o, ProtocolEncoderOutput protocolEncoderOutput) throws Exception { //TODO 根据协议编码 //组装好之后 ioSession.write(IoBuffer)写出 System.out.println(o); } }