转载

mqtt 协议之 PINGREQ, PINGRESP

mqtt 协议里最简单的是 ping 协议吧 (心跳包), ping 协议是已连接的客户端发往服务端, 告诉服务端,我还"活着"

PINGREQ - PING request

fixed header format.

bit 7 6 5 4 3 2 1 0
byte 1 Message Type (12) DUP flag QoS level RETAIN
1 1 0 0 x x x x
byte 2 Remaining Length (0)
0 0 0 0 0 0 0 0

no variable header

no payload

response:   The response to a PINGREQ message is a PINGRESP message.

PINGRESP - PING response

fixed header

bit 7 6 5 4 3 2 1 0
byte 1 Message Type (13) DUP flag QoS level RETAIN
1 1 0 1 x x x x
byte 2 Remaining Length (0)
0 0 0 0 0 0 0 0

no variable header

no payload

------------------------------------------------------------------------ 华丽的分界线 ---------------------------------------

客户端会在一个心跳周期内发送一条PINGREQ消息到服务器端。两个字节,固定值。

服务器收到PINGREQ请求之后,会立即响应一个两个字节固定格式的PINGRESP消息。

周期定义在 心跳频率在CONNECT(连接包)可变头部“Keep Alive timer”中定义时间,单位为秒,无符号16位short表示。

ok ,上代码 :

固定头部 FinedHeader

/// <summary> /// Fixed header /// </summary> internal class FixedHeader {  /// <summary>  /// Message type  /// </summary>  public MessageType MessageType { get; set; }  /// <summary>  /// DUP flag  /// </summary>  public bool Dup { get; set; }  /// <summary>  /// QoS flags  /// </summary>  public Qos Qos { get; set; }  /// <summary>  /// RETAIN 保持  /// </summary>  public bool Retain { get; set; }  /// <summary>  /// Remaining Length 剩余长度  /// 单个字节最大值:01111111,16进制:0x7F,10进制为127。  /// MQTT协议规定,第八位(最高位)若为1,则表示还有后续字节存在。  /// MQTT协议最多允许4个字节表示剩余长度。  /// 最大长度为:0xFF,0xFF,0xFF,0x7F,  /// 二进制表示为:11111111,11111111,11111111,01111111,十进制:268435455  /// </summary>  public int RemaingLength { get; set; }  public FixedHeader() { }  public FixedHeader(Stream stream)  {   if (stream.Length < 2)    throw new Exception("The supplied header is invalid. Header must be at least 2 bytes long.");   var byte1 = stream.ReadByte();   MessageType = (MessageType)((byte1 & 0xf0) >> 4);   Dup = ((byte1 & 0x08) >> 3) > 0;   Qos = (Qos)((byte1 & 0x06) >> 1);   Retain = (byte1 & 0x01) > 0;   //Remaining Length   //var byte2 = stream.ReadByte();   var lengthBytes = ReadLengthBytes(stream);   RemaingLength = CalculateLength(lengthBytes);  }  public void WriteTo(Stream stream)  {   var flags = (byte)MessageType << 4;   flags |= (Dup ? 1 : 0) << 3;   flags |= (byte)Qos << 1;   flags |= Retain ? 1 : 0;   stream.WriteByte((byte)flags);  //byte 1   if (RemaingLength == 0)   //byte 2    stream.WriteByte(0);   else   {    do    {     int digit = RemaingLength & 0x7f;     RemaingLength = RemaingLength >> 7;     if (RemaingLength > 0)      digit = digit | 0x80;     stream.WriteByte((byte)digit);    } while (RemaingLength > 0);   }  }  internal static byte[] ReadLengthBytes(Stream stream)  {   var lengthBytes = new List<byte>();   // read until we've got the entire size, or the 4 byte limit is reached   byte sizeByte;   int byteCount = 0;   do   {    sizeByte = (byte)stream.ReadByte();    lengthBytes.Add(sizeByte);   } while (++byteCount <= 4 && (sizeByte & 0x80) == 0x80);   return lengthBytes.ToArray();  }  internal static int CalculateLength(byte[] lengthBytes)  {   var remainingLength = 0;   var multiplier = 1;   foreach (var currentByte in lengthBytes)   {    remainingLength += (currentByte & 0x7f) * multiplier;    multiplier *= 0x80;   }   return remainingLength;  } } 

消息父类: Message

internal class Message {  public FixedHeader FixedHeader { get; protected set; }  public Message()  {  }  public Message(MessageType messageType)  {   FixedHeader = new FixedHeader   {    MessageType = messageType   };  }  public virtual void WriteTo(Stream stream)  {  }  public static Message CreateFrom(byte[] buffer)  {   using (var stream = new MemoryStream(buffer))   {    return CreateFrom(stream);   }  }  public static Message CreateFrom(Stream stream)  {   var header = new FixedHeader(stream);   return CreateMessage(header, stream);  }  public static Message CreateMessage(FixedHeader header, Stream stream)  {   switch (header.MessageType)   {    case MessageType.CONNACK:     return new ConnAckMessage(header, stream);    case MessageType.DISCONNECT:     return null;    case MessageType.PINGREQ:     return new PingReqMessage();    case MessageType.PUBACK:     return new PublishAckMessage(header, stream);    case MessageType.PUBCOMP:     //return new MqttPubcompMessage(str, header);    case MessageType.PUBLISH:     //return new MqttPublishMessage(str, header);    case MessageType.PUBREC:     //return new MqttPubrecMessage(str, header);    case MessageType.PUBREL:     //return new MqttPubrelMessage(str, header);    case MessageType.SUBACK:     //return new MqttSubackMessage(str, header);    case MessageType.UNSUBACK:     //return new MqttUnsubackMessage(str, header);    case MessageType.PINGRESP:     return new PingRespMessage(header, stream);    case MessageType.UNSUBSCRIBE:    case MessageType.CONNECT:    case MessageType.SUBSCRIBE:    default:     throw new Exception("Unsupported Message Type");   }  } } 

两个枚举:

MessageType  (消息类型)

Qos (服务质量等级)

 [Flags]  public enum MessageType : byte  {   CONNECT  = 1,   CONNACK  = 2,   PUBLISH  = 3,   PUBACK   = 4,   PUBREC   = 5,   PUBREL   = 6,   PUBCOMP  = 7,   SUBSCRIBE   = 8,   SUBACK   = 9,   UNSUBSCRIBE = 10,   UNSUBACK = 11,   PINGREQ  = 12,   PINGRESP = 13,   DISCONNECT  = 14  }  /// <summary>  /// 服务质量等级  /// </summary>  [Flags]  public enum Qos : byte  {   /// <summary>   ///  QOS Level 0 - Message is not guaranteed delivery. No retries are made to ensure delivery is successful.   /// </summary>   AtMostOnce = 0,   /// <summary>   ///  QOS Level 1 - Message is guaranteed delivery. It will be delivered at least one time, but may be delivered   ///  more than once if network errors occur.   /// </summary>   AtLeastOnce = 1,   /// <summary>   ///  QOS Level 2 - Message will be delivered once, and only once. Message will be retried until   ///  it is successfully sent..   /// </summary>   ExactlyOnce = 2,  } 

ping 请求包:  PingReqMessage

响应包:         PingRespMessage

internal sealed class PingReqMessage : Message {  public PingReqMessage()   : base(MessageType.PINGREQ)  {  }  public override void WriteTo(Stream stream)  {   FixedHeader.WriteTo(stream);  } } internal class PingRespMessage : Message {  public PingRespMessage()   : base(MessageType.PINGRESP)  {  }  public PingRespMessage(FixedHeader header, Stream stream)  {   FixedHeader = header;  } } 

OK.

正文到此结束
Loading...