MQTT PUBLISH消息报文格式
查看MQTT官方的文档,得到固定完整PUBLISH消息发送报文格式:
固定报头 + 主题名(可变报头) + 有效载荷
固定报头:MQTT控制报文类型【7:4】已经固定为0011b、【3:0】位全部采用0。即固定报头:0x30

固定报头代码:
static int ICACHE_FLASH_ATTR init_message(mqtt_connection_t* connection)
{
//由于腾讯云只支持单条mqtt信息长度为1Kb以内,所以只需采用2个字节
//另一个字节是固定报头的第一个字节(标志之类的)
connection->message.length = MQTT_MAX_FIXED_HEADER_SIZE;
return MQTT_MAX_FIXED_HEADER_SIZE;
}剩余长度:主题名的长度(占用的字节数) + 主题名长度占用的字节数(占用2个字节,因为采用了UTF-8编码) + 有效载荷长度(占用的字节数)
![]()
计算剩余长度的代码
static mqtt_message_t* ICACHE_FLASH_ATTR fini_message(mqtt_connection_t* connection, int type, int dup, int qos, int retain)
{
//计算剩余长度
int remaining_length = connection->message.length - MQTT_MAX_FIXED_HEADER_SIZE;
//buffer[0]应该是固定报头: 0x30(PUBLISH报文类型;DUP=0;Qos=0;RETAIN=0)
//buffer[1]:剩余长度:低字节
//buffer[2]:剩余长度:高字节
//但是腾讯云MQTT的报文长度没有达到1024字节。
if(remaining_length > 127)
{
connection->buffer[0] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1);
connection->buffer[1] = 0x80 | (remaining_length % 128); //剩余长度:低字节
connection->buffer[2] = remaining_length / 128; //剩余长度:高字节
connection->message.length = remaining_length + 3; //多此一举(可以删除).这里connection->message.length已经是整个mqtt报文长度了
connection->message.data = connection->buffer;
}
else //实际进入到这里了
{
connection->buffer[1] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1);
connection->buffer[2] = remaining_length; //剩余长度
connection->message.length = remaining_length + 2; //多此一举(可以删除).这里connection->message.length已经是整个mqtt报文长度了
connection->message.data = connection->buffer + 1; //丢弃buffer[0]因为这一个字节没有用到。固定报文头只占2个字节
}
//->的优先级高于&,所以代码等效为&(connection->message)
return &connection->message;
}文档关于剩余长度的说明:剩余长度字段使用一个变长度编码方案,对小于 128 的值它使用单字节编码。更大的值按下面
的方式处理。低 7 位有效位用于编码数据,最高有效位用于指示是否有更多的字节。因此每个
字节可以编码 128 个数值和一个延续位( continuation bit )。剩余长度字段最大 4个字节 。
注:由于我们ESP8266 SDK中带的MQTT例程源码中定义的MQTT数据包长度为1024Kb,所以这里我们这里的剩余长度只要考虑 2个字节的情况就行了。
| 描述 | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | |
Topic Name主题名 | |||||||||
byte 1 | Length MSB | X | X | X | X | X | X | X | X |
byte 2 | Length LSB | X | X | X | X | X | X | X | X |
byte 3... | X | X | X | X | X | X | X | X | |
针对以上表格说明:
byte1和byte2表示主题名长度,主题名长度采用低字节在前编码。所以byte1其实是低字节,byte2表示高字节。
比如:byte1为0xc0,byte2为0x01。那么实际主题名长度为0x01c0换算成10进制为128+64=192
byt3...表示:这里有多个字节,这个需要根据具体的主题名决定。
Topic主题添加的代码:
static int ICACHE_FLASH_ATTR append_string(mqtt_connection_t* connection, const char* string, int len)
{
//整个可变报头长度:Topic Name内容长度+Topic Name内容长度占用了2个字节 = len + 2
if(connection->message.length + len + 2 > connection->buffer_length)
return -1;
//可变报头:存储Topic Name的长度需要以UTF-8编码存储。2个字节:低字节在前,高字节在后
connection->buffer[connection->message.length++] = len >> 8; //获取低字节数据
connection->buffer[connection->message.length++] = len & 0xff; //高字节数据
//可变报头:存储Topic Name的内容
memcpy(connection->buffer + connection->message.length, string, len);
//消息长度:加上Topic Name的内容长度
connection->message.length += len;
//返回“可变报头”转换成mqtt格式所占用的字节长度(len + 2)
return len + 2;
}有效载荷
有效载荷包含将被发布的应用消息。数据的内容和格式是应用 特定的(一般用JOSN格式) 。
有效载荷的长度: 就是有效载荷字符串的长度。包含零长度有效载荷的 PUBLISH报文是合法的
MQTT PUBLISH源码分析:
因为ESP8266 SDK中的MQTT例程中MQTT数据包buffer长度最大为1024字节。所以剩余长度最多占用2个字节
有效载荷添加代码
//判断当前data长度+已经保存的Topic Name + Topic Name长度占用的2字节 + 固定报头长度(3字节)
//是否比缓存空间大
if(connection->message.length + data_length > connection->buffer_length)
return fail_message(connection);
//有效载荷:拼接data数据到buffer,即将data数据填充到“有效载荷”缓冲区域
//注意:!!!!有效载荷没有表示长度的字节!!!!
memcpy(connection->buffer + connection->message.length, data, data_length);
//记录保存了“有效载荷”后的message长度
connection->message.length += data_length;完整的MQTT PUBLISH报文打包代码:
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_publish(mqtt_connection_t* connection, const char* topic, const char* data, int data_length, int qos, int retain, uint16_t* message_id)
{
//connection->buffer空间为1024字节
//初始化固定报头:给它预留3个字节空间用于储存数据
init_message(connection);
if(topic == NULL || topic[0] == '\0')
return fail_message(connection);
//拼接topic数据到buffer中
if(append_string(connection, topic, strlen(topic)) < 0)
return fail_message(connection);
if(qos > 0)
{
if((*message_id = append_message_id(connection, 0)) == 0)
return fail_message(connection);
}
else
*message_id = 0;
//判断当前data长度+已经保存的Topic Name + Topic Name长度占用的2字节 + 固定报头长度(3字节)
//是否比缓存空间大
if(connection->message.length + data_length > connection->buffer_length)
return fail_message(connection);
//有效载荷:拼接data数据到buffer,即将data数据填充到“有效载荷”缓冲区域
//注意:!!!!有效载荷没有表示长度的字节!!!!
memcpy(connection->buffer + connection->message.length, data, data_length);
//记录保存了“有效载荷”后的message长度
connection->message.length += data_length;
//剩余长度:fini_message函数中将message中buffer指针
return fini_message(connection, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain);
}
版权声明:本文为twx11213030422原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。