物联网|MQTT协议 STM32、ESP8266基于EMQ个人服务器——报文处理(1)

新人博主第一次写blog,本文主要是对于MQTT协议报文和stm32的硬件、代码层面进行讲解,有不足的地方欢迎大家指正。
本人在学习MQTT协议时走了不少弯路,而且网络上对于STM32与MQTT通讯的资料少之又少,所以写下本文供大家参考
前期准备 本次实验材料主要有: STM32F103ZET6开发板/核心板+ESP8266开发板+n个led灯
物联网|MQTT协议 STM32、ESP8266基于EMQ个人服务器——报文处理(1)
文章图片

上面这款ESP8266就可以,很方便使用
MQTT协议 基本原理 先讲一下MQTT是怎么通讯的吧
MQTT共有3个角色和1个数据流:发布者(Publisher),订阅者(Subscriber)、代理者(Broker)、主题(public)
相信大家都用过B站,我在这里做个比喻:b站用户好比是订阅者,up主是发布者,b站平台是代理者
我们之间有这样的关系:
物联网|MQTT协议 STM32、ESP8266基于EMQ个人服务器——报文处理(1)
文章图片

我们先只看蓝色部分就是一次MQTT通讯,up主发布视频到视频发布主题,b站服务器收到后通知用户。
当然我们可以建立多个public进行双向数据交流,互不干涉(笔者第一从想实现双向通讯,当时只建立了一个主题,导致双方的数据紊乱,大家记得避坑)
我们的STM32与EMQ服务器通讯也是同理,stm32(Pubilsher)与EMQ服务器的Websocket(Subscriber)同时订阅一个主题,就可以实现将stm32数据传输到主题中,Websocket从主题中获取数据

报文 由于报文内容太多,网络上的资料很详细,这里不在详细讲解,想了解报文的同学可以移步mqtt报文解析—超详细_不懂一休-CSDN博客_mqtt报文
笔者当时在学习报文的时候被搞得头大,所以为了方便大家学习使用,在这里我们只讲几个简单无脑常用的报文类型的写法
CONNECT:
固定报头:10 ??
此处?? 为剩余长度,即可变报头长度+负载长度,通常是构建完可变报头和负载才去计算的。举个例子,如果剩余长度小于128,例如剩余长度=88,则此位为0x58,固定报头为10 58 。如果大于128,则固定报头再加一个字节为10 ?? ?? ,例如剩余长度为152,则第2字节=152/128=1=0x01,第3字节=152%128=24=0x18,固定报头为 10 01 18。
ps:一定注意报文都是HEX(16进制)格式
可变报头:00 04 4D 51 54 54 04 C2 00 64
负载报文:?? "客户端ID" ?? "用户名" ?? "密码"
负载报文上的3个??分别为ID长度、用户名长度、密码长度,它们的计算方法与固定报头中的剩余长度一样,这里给大家举一个简单的例子
ID:123
用户名:LOUIS
密码:123456
则负载报文为:03 31 32 33 05 4C 4F 55 49 53 06 31 32 33 34 35 36
最终这个CONNECT报文就为:10 18 00 04 4D 51 54 54 04 C2 00 64 03 31 32 33 05 4C 4F 55 49 53 06 31 32 33 34 35 36
下面是connect报文数据的处理代码

/*----------------------------------------------------------*/ /*函数名:连接服务器报文*/ /*参数:无*/ /*返回值:无*/ /*----------------------------------------------------------*/ void MQTT_ConectPack(void) { int temp,Remaining_len; Fixed_len = 1; //连接报文中,固定报头长度暂时先=1 Variable_len = 10; //连接报文中,可变报头长度=10 Payload_len = 2 + ClientID_len + 2 + Username_len + 2 + Passward_len; //连接报文中,负载长度 Remaining_len = Variable_len + Payload_len; //剩余长度=可变报头长度+负载长度 temp_buff[0]=0x10; //固定报头第1个字节 :固定0x01 do{//循环处理固定报头中的剩余长度字节,字节量根据剩余字节的真实长度变化 temp = Remaining_len%128; //剩余长度取余128 Remaining_len = Remaining_len/128; //剩余长度取整128 if(Remaining_len>0) temp |= 0x80; //按协议要求位7置位 temp_buff[Fixed_len] = temp; //剩余长度字节记录一个数据 Fixed_len++; //固定报头总长度+1 }while(Remaining_len>0); //如果Remaining_len>0的话,再次进入循环 temp_buff[Fixed_len+0]=0x00; //可变报头第1个字节 :固定0x00 temp_buff[Fixed_len+1]=0x04; //可变报头第2个字节 :固定0x04 temp_buff[Fixed_len+2]=0x4D; //可变报头第3个字节 :固定0x4D temp_buff[Fixed_len+3]=0x51; //可变报头第4个字节 :固定0x51 temp_buff[Fixed_len+4]=0x54; //可变报头第5个字节 :固定0x54 temp_buff[Fixed_len+5]=0x54; //可变报头第6个字节 :固定0x54 temp_buff[Fixed_len+6]=0x04; //可变报头第7个字节 :固定0x04 temp_buff[Fixed_len+7]=0xC2; //可变报头第8个字节 :使能用户名和密码校验,不使用遗嘱,不保留会话 temp_buff[Fixed_len+8]=0x00; //可变报头第9个字节 :保活时间高字节 0x00 temp_buff[Fixed_len+9]=0x64; //可变报头第10个字节:保活时间高字节 0x64100s /*CLIENT_ID*/ temp_buff[Fixed_len+10] = ClientID_len/256; //客户端ID长度高字节 temp_buff[Fixed_len+11] = ClientID_len%256; //客户端ID长度低字节 memcpy(&temp_buff[Fixed_len+12],ClientID,ClientID_len); //复制过来客户端ID字串 /*用户名*/ temp_buff[Fixed_len+12+ClientID_len] = Username_len/256; //用户名长度高字节 temp_buff[Fixed_len+13+ClientID_len] = Username_len%256; //用户名长度低字节 memcpy(&temp_buff[Fixed_len+14+ClientID_len],Username,Username_len); //复制过来用户名字串 /*密码*/ temp_buff[Fixed_len+14+ClientID_len+Username_len] = Passward_len/256; //密码长度高字节 temp_buff[Fixed_len+15+ClientID_len+Username_len] = Passward_len%256; //密码长度低字节 memcpy(&temp_buff[Fixed_len+16+ClientID_len+Username_len],Passward,Passward_len); //复制过来密码字串 TxDataBuf_Deal(temp_buff, Fixed_len + Variable_len + Payload_len); //加入发送数据缓冲区 }

Subscribe:
固定:82 ??(此处为剩余长度与connect同理计算)
可变:00 01
负载:?? "主题名" 00 (此处00为订阅等级Qs0)(计算与connect负载同理)
/*----------------------------------------------------------*/ /*函数名:SUBSCRIBE订阅topic报文*/ /*参数:QoS:订阅等级*/ /*参数:topic_name:订阅topic报文名称*/ /*返回值:无*/ /*----------------------------------------------------------*/ void MQTT_Subscribe(char *topic_name, int QoS) { Fixed_len = 2; //SUBSCRIBE报文中,固定报头长度=2 Variable_len = 2; //SUBSCRIBE报文中,可变报头长度=2 Payload_len = 2 + strlen(topic_name) + 1; //计算有效负荷长度 = 2字节(topic_name长度)+ topic_name字符串的长度 + 1字节服务等级 temp_buff[0]=0x82; //第1个字节 :固定0x82 temp_buff[1]=Variable_len + Payload_len; //第2个字节 :可变报头+有效负荷的长度 temp_buff[2]=0x00; //第3个字节 :报文标识符高字节,固定使用0x00 temp_buff[3]=0x01; //第4个字节 :报文标识符低字节,固定使用0x01 temp_buff[4]=strlen(topic_name)/256; //第5个字节 :topic_name长度高字节 temp_buff[5]=strlen(topic_name)%256; //第6个字节 :topic_name长度低字节 memcpy(&temp_buff[6],topic_name,strlen(topic_name)); //第7个字节开始 :复制过来topic_name字串 temp_buff[6+strlen(topic_name)]=QoS; //最后1个字节:订阅等级 TxDataBuf_Deal(temp_buff, Fixed_len + Variable_len + Payload_len); //加入发送数据缓冲区 }

PING:
固定:C0 00
/*----------------------------------------------------------*/ /*函数名:PING报文,心跳包*/ /*参数:无*/ /*返回值:无*/ /*----------------------------------------------------------*/ void MQTT_PingREQ(void) { temp_buff[0]=0xC0; //第1个字节 :固定0xC0 temp_buff[1]=0x00; //第2个字节 :固定0x00 TxDataBuf_Deal(temp_buff, 2); //加入数据到缓冲区 }

Publish:
固定:30 ??
可变:?? "主题名"
??为主题名的长度
负载:?? "要发送的数据"
??为数据的长度
/*----------------------------------------------------------*/ /*函数名:等级0 发布消息报文*/ /*参数:topic_name:topic名称*/ /*参数:data:数据*/ /*参数:data_len:数据长度*/ /*返回值:无*/ /*----------------------------------------------------------*/ void MQTT_PublishQs0(char *topic, char *data, int data_len) { int temp,Remaining_len; Fixed_len = 1; //固定报头长度暂时先等于:1字节 Variable_len = 2 + strlen(topic); //可变报头长度:2字节(topic长度)+ topic字符串的长度 Payload_len = data_len; //有效负荷长度:就是data_len Remaining_len = Variable_len + Payload_len; //剩余长度=可变报头长度+负载长度 temp_buff[0]=0x30; //固定报头第1个字节 :固定0x30 do{//循环处理固定报头中的剩余长度字节,字节量根据剩余字节的真实长度变化 temp = Remaining_len%128; //剩余长度取余128 Remaining_len = Remaining_len/128; //剩余长度取整128 if(Remaining_len>0) temp |= 0x80; //按协议要求位7置位 temp_buff[Fixed_len] = temp; //剩余长度字节记录一个数据 Fixed_len++; //固定报头总长度+1 }while(Remaining_len>0); //如果Remaining_len>0的话,再次进入循环 temp_buff[Fixed_len+0]=strlen(topic)/256; //可变报头第1个字节:topic长度高字节 temp_buff[Fixed_len+1]=strlen(topic)%256; //可变报头第2个字节:topic长度低字节 memcpy(&temp_buff[Fixed_len+2],topic,strlen(topic)); //可变报头第3个字节开始 :拷贝topic字符串 memcpy(&temp_buff[Fixed_len+2+strlen(topic)],data,data_len); //有效负荷:拷贝data数据 TxDataBuf_Deal(temp_buff, Fixed_len + Variable_len + Payload_len); //加入发送数据缓冲区 }

处理Publisher发送的命令,笔者这里用的是EMQ个人服务器所以Publisher发的报文相对简单
只有长度+数据的格式
下面是对收到的数据的处理
*----------------------------------------------------------*/ /*函数名:处理服务器发来的等级0的推送*/ /*参数:redata:接收的数据*/ /*返回值:无*/ /*----------------------------------------------------------*/ void MQTT_DealPushdata_Qs0(unsigned char *redata) { intre_len; //定义一个变量,存放接收的数据总长度 intpack_num; //定义一个变量,当多个推送一起过来时,保存推送的个数 inttemp,temp_len; //定义一个变量,暂存数据 inttotle_len; //定义一个变量,存放已经统计的推送的总数据量 inttopic_len; //定义一个变量,存放推送中主题的长度 intcmd_len; //定义一个变量,存放推送中包含的命令数据的长度 intcmd_loca; //定义一个变量,存放推送中包含的命令的起始位置 inti; //定义一个变量,用于for循环 intlocal,multiplier; unsigned char tempbuff[RBUFF_UNIT]; //临时缓冲区 unsigned char *data; //redata过来的时候,第一个字节是数据总量,data用于指向redata的第2个字节,真正的数据开始的地方 re_len = redata[0]*256+redata[1]; //获取接收的数据总长度 data = https://www.it610.com/article/&redata[2]; //data指向redata的第2个字节,真正的数据开始的 pack_num = temp_len = totle_len = temp = 0; //各个变量清零 local = 1; multiplier = 1; do{ pack_num++; //开始循环统计推送的个数,每次循环推送的个数+1 do{ temp = data[totle_len + local]; temp_len += (temp & 127) * multiplier; multiplier *= 128; local++; }while ((temp & 128) != 0); totle_len += (temp_len + local); //累计统计的总的推送的数据长度 re_len -= (temp_len + local) ; //接收的数据总长度 减去 本次统计的推送的总长度 local = 1; multiplier = 1; temp_len = 0; }while(re_len!=0); //如果接收的数据总长度等于0了,说明统计完毕了 u1_printf("本次接收了%d个推送数据\r\n",pack_num); //串口输出信息 temp_len = totle_len = 0; //各个变量清零 local = 1; multiplier = 1; for(i=0; i

初始化 下面是各部分缓冲区初始化,及上面函数中变量的相关定义
#include "stm32f10x.h" #include "mqtt.h" #include "string.h" #include "stdio.h" #include "usart1.h" #include "wifi.h"unsigned charMQTT_RxDataBuf[R_NUM][RBUFF_UNIT]; //数据的接收缓冲区,所有服务器发来的数据,存放在该缓冲区,缓冲区第一个字节存放数据长度 unsigned char *MQTT_RxDataInPtr; //指向接收缓冲区存放数据的位置 unsigned char *MQTT_RxDataOutPtr; //指向接收缓冲区读取数据的位置 unsigned char *MQTT_RxDataEndPtr; //指向接收缓冲区结束的位置unsigned charMQTT_TxDataBuf[T_NUM][TBUFF_UNIT]; //数据的发送缓冲区,所有发往服务器的数据,存放在该缓冲区,缓冲区第一个字节存放数据长度 unsigned char *MQTT_TxDataInPtr; //指向发送缓冲区存放数据的位置 unsigned char *MQTT_TxDataOutPtr; //指向发送缓冲区读取数据的位置 unsigned char *MQTT_TxDataEndPtr; //指向发送缓冲区结束的位置unsigned charMQTT_CMDBuf[C_NUM][CBUFF_UNIT]; //命令数据的接收缓冲区 unsigned char *MQTT_CMDInPtr; //指向命令缓冲区存放数据的位置 unsigned char *MQTT_CMDOutPtr; //指向命令缓冲区读取数据的位置 unsigned char *MQTT_CMDEndPtr; //指向命令缓冲区结束的位置char ClientID[128]; //存放客户端ID的缓冲区 intClientID_len; //存放客户端ID的长度char Username[128]; //存放用户名的缓冲区 intUsername_len; //存放用户名的长度char Passward[128]; //存放密码的缓冲区 intPassward_len; //存放密码的长度char ServerIP[128]; //存放服务器IP或是域名 intServerPort; //存放服务器的端口号intFixed_len; //固定报头长度 intVariable_len; //可变报头长度 intPayload_len; //有效负荷长度 unsigned chartemp_buff[TBUFF_UNIT]; //临时缓冲区,构建报文用char Ping_flag; //ping报文状态0:正常状态,等待计时时间到,发送Ping报文 //ping报文状态1:Ping报文已发送,当收到 服务器回复报文的后 将1置为0 char Connect_flag; //同服务器连接状态0:还没有连接服务器1:连接上服务器了 char ConnectPack_flag; //CONNECT报文状态1:CONNECT报文成功 char SubcribePack_flag; //订阅报文状态1:订阅报文成功/*----------------------------------------------------------*/ /*函数名:初始化接收,发送,命令数据的 缓冲区 以及各状态参数*/ /*参数:无*/ /*返回值:无*/ /*----------------------------------------------------------*/ void MQTT_Buff_Init(void) { MQTT_RxDataInPtr=MQTT_RxDataBuf[0]; //指向发送缓冲区存放数据的指针归位 MQTT_RxDataOutPtr=MQTT_RxDataInPtr; //指向发送缓冲区读取数据的指针归位 MQTT_RxDataEndPtr=MQTT_RxDataBuf[R_NUM-1]; //指向发送缓冲区结束的指针归位 MQTT_TxDataInPtr=MQTT_TxDataBuf[0]; //指向发送缓冲区存放数据的指针归位 MQTT_TxDataOutPtr=MQTT_TxDataInPtr; //指向发送缓冲区读取数据的指针归位 MQTT_TxDataEndPtr=MQTT_TxDataBuf[T_NUM-1]; //指向发送缓冲区结束的指针归位 MQTT_CMDInPtr=MQTT_CMDBuf[0]; //指向命令缓冲区存放数据的指针归位 MQTT_CMDOutPtr=MQTT_CMDInPtr; //指向命令缓冲区读取数据的指针归位 MQTT_CMDEndPtr=MQTT_CMDBuf[C_NUM-1]; //指向命令缓冲区结束的指针归位MQTT_ConectPack(); //发送缓冲区添加连接报文 MQTT_Subscribe(S_TOPIC_NAME,0); //发送缓冲区添加订阅topic,等级0 Ping_flag = ConnectPack_flag = SubcribePack_flag = 0; //各个参数清零 } /*----------------------------------------------------------*/ /*函数名:云初始化参数,得到客户端ID,用户名和密码*/ /*参数:无*/ /*返回值:无*/ /*----------------------------------------------------------*/ void IoT_Parameter_Init(void) { memset(ClientID,128,0); //客户端ID的缓冲区全部清零 sprintf(ClientID,"%s",DEVICEID); //构建客户端ID,并存入缓冲区 ClientID_len = strlen(ClientID); //计算客户端ID的长度 memset(Username,128,0); //用户名的缓冲区全部清零 sprintf(Username,"%s",PRODUCTID); //构建用户名,并存入缓冲区 Username_len = strlen(Username); //计算用户名的长度 memset(Passward,128,0); //用户名的缓冲区全部清零 sprintf(Passward,"%s",AUTHENTICATION); //构建密码,并存入缓冲区 Passward_len = strlen(Passward); //计算密码的长度 memset(ServerIP,0,128); sprintf(ServerIP,"%s","39.104.27.119"); //构建服务器域名 ServerPort = 1883; //服务器端口号1883 u1_printf("服 务 器:%s:%d\r\n",ServerIP,ServerPort); //串口输出调试信息 u1_printf("客户端ID:%s\r\n",ClientID); //串口输出调试信息 u1_printf("用 户 名:%s\r\n",Username); //串口输出调试信息 u1_printf("密码:%s\r\n",Passward); //串口输出调试信息 } /*----------------------------------------------------------*/ /*函数名:处理发送缓冲区*/ /*参数:data:数据*/ /*参数:size:数据长度*/ /*返回值:无*/ /*----------------------------------------------------------*/ void TxDataBuf_Deal(unsigned char *data, int size) { memcpy(&MQTT_TxDataInPtr[2],data,size); //拷贝数据到发送缓冲区 MQTT_TxDataInPtr[0] = size/256; //记录数据长度 MQTT_TxDataInPtr[1] = size%256; //记录数据长度 MQTT_TxDataInPtr+=TBUFF_UNIT; //指针下移 if(MQTT_TxDataInPtr==MQTT_TxDataEndPtr)//如果指针到缓冲区尾部了 MQTT_TxDataInPtr = MQTT_TxDataBuf[0]; //指针归位到缓冲区开头 } /*----------------------------------------------------------*/ /*函数名:处理命令缓冲区*/ /*参数:data:数据*/ /*参数:size:数据长度*/ /*返回值:无*/ /*----------------------------------------------------------*/ void CMDBuf_Deal(unsigned char *data, int size) { memcpy(&MQTT_CMDInPtr[2],data,size); //拷贝数据到命令缓冲区 MQTT_CMDInPtr[0] = size/256; //记录数据长度 MQTT_CMDInPtr[1] = size%256; //记录数据长度 MQTT_CMDInPtr[size+2] = '\0'; //加入字符串结束符 MQTT_CMDInPtr+=CBUFF_UNIT; //指针下移 if(MQTT_CMDInPtr==MQTT_CMDEndPtr)//如果指针到缓冲区尾部了 MQTT_CMDInPtr = MQTT_CMDBuf[0]; //指针归位到缓冲区开头 }


头文件 为了方便大家使用代码笔者顺便把mqtt头文件贴出来吧
#ifndef __MQTT_H #define __MQTT_H#defineR_NUM5//接收缓冲区个数 #defineRBUFF_UNIT300//接收缓冲区长度#defineT_NUM5//发送缓冲区个数 #defineTBUFF_UNIT300//发送缓冲区长度#defineC_NUM5//命令缓冲区个数 #defineCBUFF_UNIT300//命令缓冲区长度#defineMQTT_TxData(x)u2_TxData(x)//串口2负责数据发送#definePRODUCTID"Car001"//ID #definePRODUCTID_LENstrlen(PRODUCTID)//ID长度 #defineDEVICEID"OurIoT"//用户名 #defineDEVICEID_LENstrlen(DEVICEID)//用户名长度 #defineAUTHENTICATION"000000"//密码 #defineAUTHENTICATION_LENstrlen(AUTHENTICATION)//密码长度 #defineS_TOPIC_NAME"t/c"//需要订阅的主题 #defineP_TOPIC_NAME"t/v"//需要发布的主题extern unsigned charMQTT_RxDataBuf[R_NUM][RBUFF_UNIT]; //外部变量声明,数据的接收缓冲区,所有服务器发来的数据,存放在该缓冲区,缓冲区第一个字节存放数据长度 extern unsigned char *MQTT_RxDataInPtr; //外部变量声明,指向缓冲区存放数据的位置 extern unsigned char *MQTT_RxDataOutPtr; //外部变量声明,指向缓冲区读取数据的位置 extern unsigned char *MQTT_RxDataEndPtr; //外部变量声明,指向缓冲区结束的位置 extern unsigned charMQTT_TxDataBuf[T_NUM][TBUFF_UNIT]; //外部变量声明,数据的发送缓冲区,所有发往服务器的数据,存放在该缓冲区,缓冲区第一个字节存放数据长度 extern unsigned char *MQTT_TxDataInPtr; //外部变量声明,指向缓冲区存放数据的位置 extern unsigned char *MQTT_TxDataOutPtr; //外部变量声明,指向缓冲区读取数据的位置 extern unsigned char *MQTT_TxDataEndPtr; //外部变量声明,指向缓冲区结束的位置 extern unsigned charMQTT_CMDBuf[C_NUM][CBUFF_UNIT]; //外部变量声明,命令数据的接收缓冲区 extern unsigned char *MQTT_CMDInPtr; //外部变量声明,指向缓冲区存放数据的位置 extern unsigned char *MQTT_CMDOutPtr; //外部变量声明,指向缓冲区读取数据的位置 extern unsigned char *MQTT_CMDEndPtr; //外部变量声明,指向缓冲区结束的位置extern char ClientID[128]; //外部变量声明,存放客户端ID的缓冲区 extern intClientID_len; //外部变量声明,存放客户端ID的长度 extern char Username[128]; //外部变量声明,存放用户名的缓冲区 extern intUsername_len; //外部变量声明,存放用户名的长度 extern char Passward[128]; //外部变量声明,存放密码的缓冲区 extern intPassward_len; //外部变量声明,存放密码的长度 extern char ServerIP[128]; //外部变量声明,存放服务器IP或是域名 extern intServerPort; //外部变量声明,存放服务器的端口号extern char Ping_flag; //外部变量声明,ping报文状态0:正常状态,等待计时时间到,发送Ping报文 //外部变量声明,ping报文状态1:Ping报文已发送,当收到 服务器回复报文的后 将1置为0 extern char Connect_flag; //外部变量声明,同服务器连接状态0:还没有连接服务器1:连接上服务器了 extern char ReConnect_flag; //外部变量声明,重连服务器状态0:连接还存在1:连接断开,重连 extern char ConnectPack_flag; //外部变量声明,CONNECT报文状态1:CONNECT报文成功 extern char SubcribePack_flag; //外部变量声明,订阅报文状态1:订阅报文成功void MQTT_Buff_Init(void); void IoT_Parameter_Init(void); void MQTT_ConectPack(void); void MQTT_Subscribe(char *, int); void MQTT_PingREQ(void); void MQTT_PublishQs0(char *, char *, int); void MQTT_DealPushdata_Qs0(unsigned char *); void TxDataBuf_Deal(unsigned char *, int); void CMDBuf_Deal(unsigned char *, int); #endif

本节主要讲解了报文的数据发送处理,之后会更新esp8266如何连接mqtt服务器,以及stm32对服务器命令的相关处理,最终实现服务器控制点灯。
物联网|MQTT协议 STM32、ESP8266基于EMQ个人服务器——报文处理(1)
文章图片








【物联网|MQTT协议 STM32、ESP8266基于EMQ个人服务器——报文处理(1)】

    推荐阅读