ESP8266 SDK开发——MQTT例程源码分析之PUBLISH报文生成

MQTT PUBLISH消息报文格式

查看MQTT官方的文档,得到固定完整PUBLISH消息发送报文格式:

固定报头 + 主题名(可变报头) + 有效载荷

固定报头:MQTT控制报文类型【7:4】已经固定为0011b、【3:0】位全部采用0。即固定报头:0x30

固定报头代码:

static int ICACHE_FLASH_ATTR init_message(mqtt_connection_t* connection)
{
	//由于腾讯云只支持单条mqtt信息长度为1Kb以内,所以只需采用2个字节
	//另一个字节是固定报头的第一个字节(标志之类的)
  connection->message.length = MQTT_MAX_FIXED_HEADER_SIZE;
  return MQTT_MAX_FIXED_HEADER_SIZE;
}

剩余长度:主题名的长度(占用的字节数) + 主题名长度占用的字节数(占用2个字节,因为采用了UTF-8编码) + 有效载荷长度(占用的字节数)

计算剩余长度的代码 

static mqtt_message_t* ICACHE_FLASH_ATTR fini_message(mqtt_connection_t* connection, int type, int dup, int qos, int retain)
{
	//计算剩余长度
  int remaining_length = connection->message.length - MQTT_MAX_FIXED_HEADER_SIZE;
  //buffer[0]应该是固定报头: 0x30(PUBLISH报文类型;DUP=0;Qos=0;RETAIN=0)
  //buffer[1]:剩余长度:低字节
  //buffer[2]:剩余长度:高字节
  //但是腾讯云MQTT的报文长度没有达到1024字节。

  if(remaining_length > 127)
  {
    connection->buffer[0] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1);
    connection->buffer[1] = 0x80 | (remaining_length % 128); //剩余长度:低字节
    connection->buffer[2] = remaining_length / 128;	//剩余长度:高字节
    connection->message.length = remaining_length + 3;	//多此一举(可以删除).这里connection->message.length已经是整个mqtt报文长度了
    connection->message.data = connection->buffer;
  }
  else	//实际进入到这里了
  {
    connection->buffer[1] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1);
    connection->buffer[2] = remaining_length;	//剩余长度
    connection->message.length = remaining_length + 2;	//多此一举(可以删除).这里connection->message.length已经是整个mqtt报文长度了
    connection->message.data = connection->buffer + 1; //丢弃buffer[0]因为这一个字节没有用到。固定报文头只占2个字节
  }

  //->的优先级高于&,所以代码等效为&(connection->message)
  return &connection->message;
}
 
文档关于剩余长度的说明:剩余长度字段使用一个变长度编码方案,对小于 128 的值它使用单字节编码。更大的值按下面
的方式处理。低 7 位有效位用于编码数据,最高有效位用于指示是否有更多的字节。因此每个
字节可以编码 128 个数值和一个延续位( continuation bit )。剩余长度字段最大 4个字节
 
注:由于我们ESP8266 SDK中带的MQTT例程源码中定义的MQTT数据包长度为1024Kb,所以这里我们这里的剩余长度只要考虑 2个字节的情况就行了。
 
 
 描述76543210
Topic Name主题名
 
byte 1
Length MSB
XXXXXXXX
byte 2
Length LSB
XXXXXXXX
byte 3...
 XXXXXXXX

 

 

 

 

 

 

针对以上表格说明:

byte1和byte2表示主题名长度,主题名长度采用低字节在前编码。所以byte1其实是低字节,byte2表示高字节。

比如:byte1为0xc0,byte2为0x01。那么实际主题名长度为0x01c0换算成10进制为128+64=192

byt3...表示:这里有多个字节,这个需要根据具体的主题名决定。

Topic主题添加的代码:

static int ICACHE_FLASH_ATTR append_string(mqtt_connection_t* connection, const char* string, int len)
{
	//整个可变报头长度:Topic Name内容长度+Topic Name内容长度占用了2个字节 = len + 2
  if(connection->message.length + len + 2 > connection->buffer_length)
    return -1;

  //可变报头:存储Topic Name的长度需要以UTF-8编码存储。2个字节:低字节在前,高字节在后
  connection->buffer[connection->message.length++] = len >> 8;	//获取低字节数据
  connection->buffer[connection->message.length++] = len & 0xff;	//高字节数据

  //可变报头:存储Topic Name的内容
  memcpy(connection->buffer + connection->message.length, string, len);
  //消息长度:加上Topic Name的内容长度
  connection->message.length += len;

  //返回“可变报头”转换成mqtt格式所占用的字节长度(len + 2)
  return len + 2;
}

有效载荷

有效载荷包含将被发布的应用消息。数据的内容和格式是应用 特定的(一般用JOSN格式)
有效载荷的长度: 就是有效载荷字符串的长度。包含零长度有效载荷的 PUBLISH报文是合法的

 

MQTT PUBLISH源码分析:

因为ESP8266 SDK中的MQTT例程中MQTT数据包buffer长度最大为1024字节。所以剩余长度最多占用2个字节

有效载荷添加代码

  //判断当前data长度+已经保存的Topic Name + Topic Name长度占用的2字节 + 固定报头长度(3字节)
  //是否比缓存空间大
  if(connection->message.length + data_length > connection->buffer_length)
    return fail_message(connection);

  //有效载荷:拼接data数据到buffer,即将data数据填充到“有效载荷”缓冲区域
  //注意:!!!!有效载荷没有表示长度的字节!!!!
  memcpy(connection->buffer + connection->message.length, data, data_length);

//记录保存了“有效载荷”后的message长度
  connection->message.length += data_length;

完整的MQTT PUBLISH报文打包代码:

mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_publish(mqtt_connection_t* connection, const char* topic, const char* data, int data_length, int qos, int retain, uint16_t* message_id)
{
	//connection->buffer空间为1024字节

	//初始化固定报头:给它预留3个字节空间用于储存数据
  init_message(connection);

  if(topic == NULL || topic[0] == '\0')
    return fail_message(connection);

  //拼接topic数据到buffer中
  if(append_string(connection, topic, strlen(topic)) < 0)
    return fail_message(connection);

  if(qos > 0)
  {
    if((*message_id = append_message_id(connection, 0)) == 0)
      return fail_message(connection);
  }
  else
    *message_id = 0;

  //判断当前data长度+已经保存的Topic Name + Topic Name长度占用的2字节 + 固定报头长度(3字节)
  //是否比缓存空间大
  if(connection->message.length + data_length > connection->buffer_length)
    return fail_message(connection);

  //有效载荷:拼接data数据到buffer,即将data数据填充到“有效载荷”缓冲区域
  //注意:!!!!有效载荷没有表示长度的字节!!!!
  memcpy(connection->buffer + connection->message.length, data, data_length);

  //记录保存了“有效载荷”后的message长度
  connection->message.length += data_length;

  //剩余长度:fini_message函数中将message中buffer指针
  return fini_message(connection, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain);
}

 

 

 


版权声明:本文为twx11213030422原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。