LiteOS云端对接教程03-LiteOS基于MQTT对接EMQ-X服务器

1. LiteOS MQTT组件 概述 MQTT AL用来解耦基于MQTT的业务和MQTT的具体实现,具体来说以后的MQTT业务层应该有且只能使用MQTT AL提供的相关功能(API 数据结构 流程等)。MQTT AL定义MQTT的标准,用来屏蔽各个MQTT协议实现的差异(如软件库 或者硬件),让上层业务无需关心MQTT的实现部分。
MQTT AL的api接口声明在中,使用相关的接口需要包含该头文件,关于函数的详细参数请参考该头文件的声明。
配置并连接 对接服务器的所有信息保存在结构体mqtt_al_conpara_t中,其定义在mqtt_al.h中,如下:

/** @brief defines the paramter for the mqtt connect */ typedef struct { mqtt_al_string_tserveraddr; ///< mqtt server:support domain name and dot format intserverport; ///< mqtt server port mqtt_al_security_para_t*security; ///< if NULL,will use en_mqtt_security_none en_mqtt_al_verisonversion; ///< mqtt version will be used mqtt_al_string_tclientid; ///< mqtt connect client identifier mqtt_al_string_tuser; ///< mqtt connect user mqtt_al_string_tpasswd; ///< mqtt connect passwd intcleansession; ///< 1 clean the session while 0 not mqtt_al_willmsg_t*willmsg; ///< mqtt connect will message unsigned shortkeepalivetime; ///< keep alive time charconret; ///< mqtt connect code, return by server inttimeout; ///< how much time will be blocked }mqtt_al_conpara_t;

其中的一些参数值已经使用枚举给出:
  • security:安全连接参数(使用此需要确保mbedtls组件开启)
枚举值如下:
/** @briefthis enum all the transport encode we support now*/ typedef enum { en_mqtt_al_security_none = 0,///< no encode en_mqtt_al_security_psk,///< use the psk mode in transport layer en_mqtt_al_security_cas,///< use the ca mode in transport layer,only check the server en_mqtt_al_security_cacs,///< use the ca mode in transport layer,both check the server and client en_mqtt_al_security_end,///< the end for the mqtt }en_mqtt_al_security_t;

  • version:使用的MQTT协议版本
枚举值如下:
/** @brief enum the mqtt version*/ typedef enum { en_mqtt_al_version_3_1_0 = 0, en_mqtt_al_version_3_1_1, }en_mqtt_al_verison;

另外,在复制的时候还需要注意,很多字符串参数都是使用mqtt_al_string_t类型,其定义如下:
/** brief defines for all the ascii or data used in the mqtt engine */ typedef struct { char *data; ///< buffer to storage the data intlen; ///< buffer data length }mqtt_al_string_t; //used to represent any type string (maybe not ascii)

在配置结构体完成之后,调用配置函数进行配置并连接,API如下:
/** *@brief: you could use this function to connect to the mqtt server * *@param[in] conparamthe parameter we will use in connect, refer to the data mqtt_al_conpara_t *@ *@return: first you should check the return value then the return code in conparam * *@retval NULL which means you could not get the connect to the server,maybe network reason *@retval handle, which means you get the context, please check the conparam for more */ void * mqtt_al_connect( mqtt_al_conpara_t *conparam);

连接之后,首先应该检查返回的handle指针是否为空,其次应该检查mqtt_al_conpara_t结构体中conret的值,有以下枚举值:
/** @brief defines for the mqtt connect code returned by the server */ #define cn_mqtt_al_con_code_ok0///< has been accepted by the server #define cn_mqtt_al_con_code_err_version1///< server not support the version #define cn_mqtt_al_con_code_err_clientID2///< client identifier is error #define cn_mqtt_al_con_code_err_netrefuse3///< server service not ready yet #define cn_mqtt_al_con_code_err_u_p4///< bad user name or password #define cn_mqtt_al_con_code_err_auth5///< the client is not authorized #define cn_mqtt_al_con_code_err_unkown-1///< unknown reason #define cn_mqtt_al_con_code_err_network0x80 ///< network reason,you could try once more

订阅消息
EMQ-X服务器有心跳机制,实际应用中订阅之前应该先检查连接状态,本实验中暂不检查。
连接成功后,首先订阅消息,设置回调函数,方便接收下发的命令。
订阅消息的API如下:
/** * @brief you could use this function subscribe a topic from the server * * @param[in] handle the handle we get from mqtt_al_connect * * @param[in] subpararefer to the data mqtt_al_subpara_t * * @return 0 success-1failed * */ int mqtt_al_subscribe(void *handle, mqtt_al_subpara_t *subpara);

两个参数中,handle参数是之前使用mqtt_al_connect时返回的指针,直接传入即可,subpara参数需要重点讲述。
mqtt_al_subpara_t的定义如下:
/** @brief defines the mqtt subscribe parameter*/ typedef struct { mqtt_al_string_ttopic; ///< topic will be subscribe en_mqtt_al_qos_tqos; ///< qos requested fn_mqtt_al_msg_dealerdealer; ///< message dealer:used to deal the received message void*arg; ///< used for the message dealer charsubret; ///< subscribe result code inttimeout; ///< how much time will be blocked }mqtt_al_subpara_t;

其中订阅消息质量qos的枚举值如下:
/** @brief enum all the qos supported for the application */ typedef enum { en_mqtt_al_qos_0 = 0,///< mqtt QOS 0 en_mqtt_al_qos_1,///< mqtt QOS 1 en_mqtt_al_qos_2,///< mqtt QOS 2 en_mqtt_al_qos_err }en_mqtt_al_qos_t;

dealer是一个函数指针,接收到下发命令之后会被回调,arg是回调函数参数,其定义如下:
/** @briefdefines the mqtt received message dealer, called by mqtt engine*/ typedef void (*fn_mqtt_al_msg_dealer)(void *arg,mqtt_al_msgrcv_t *msg);

订阅之后,可以通过mqtt_al_subpara_t结构体中的subret值查看是否订阅成功。
发布消息 发布消息的API如下:
/** * @brief you could use this function to publish a message to the server * * @param[in] handle the handle we get from mqtt_al_connect * * @param[in] msgthe message we will publish, see the data mqtt_al_pubpara_t * * @return 0 success-1failed * */ int mqtt_al_publish(void *handle, mqtt_al_pubpara_t *pubpara);

两个参数中,handle参数是之前使用mqtt_al_connect时返回的指针,直接传入即可,pubpara参数需要重点讲述。
mqtt_al_pubpara_t的定义如下:
/** @brief defines for the mqtt publish */ typedef struct { mqtt_al_string_ttopic; ///< selected publish topic mqtt_al_string_tmsg; ///< message to be published en_mqtt_al_qos_tqos; ///< message qos intretain; ///< message retain :1 retain while 0 not inttimeout; ///< how much time will blocked }mqtt_al_pubpara_t;

MQTT组件自动初始化 MQTT在配置之后,会自动初始化。
在SDK目录中的IoT_LINK_1.0.0\iot_link\link_main.c文件中可以看到:
LiteOS云端对接教程03-LiteOS基于MQTT对接EMQ-X服务器
文章图片
2. 配置准备 Makefile配置 因为本次实验用到的组件较多:
  • AT框架
  • ESP8266设备驱动
  • 串口驱动框架
  • cJSON组件
  • SAL组件
  • MQTT组件
这些实验代码全部编译下来,有350KB,而小熊派开发板所使用的主控芯片STM32L431RCT6的 Flash 仅有256KB,会导致编译器无法链接出可执行文件,所以要在makefile中修改优化选项,修改为-Os参数,即最大限度的优化代码尺寸,并去掉-g参数,即代码只能下载运行,无法调试,如图:
LiteOS云端对接教程03-LiteOS基于MQTT对接EMQ-X服务器
文章图片
ESP8266设备配置 在工程目录中的OS_CONFIG/iot_link_config.h文件中,配置ESP8266设备的波特率和设备名称:
LiteOS云端对接教程03-LiteOS基于MQTT对接EMQ-X服务器
文章图片
WIFI对接信息配置
SDK:C:\Users\Administrator\.icode\sdk\IoT_LINK_1.0.0(其中Administrator是实验电脑的用户名)。
在SDK目录中的iot_link\network\tcpip\esp8266_socket\esp8266_socket_imp.c文件中,配置连接信息:
LiteOS云端对接教程03-LiteOS基于MQTT对接EMQ-X服务器
文章图片
之后修改同路径下的esp8266_socket_imp.mk文件,如图,将 TOP_DIR 改为 SDK_DIR :
LiteOS云端对接教程03-LiteOS基于MQTT对接EMQ-X服务器
文章图片
修改paho_mqtt文件路径 在SDK目录中的iot_link\network\mqtt\paho_mqtt\paho_mqtt.mk文件中,如图,将 TOP_DIR 改为 SDK_DIR :
LiteOS云端对接教程03-LiteOS基于MQTT对接EMQ-X服务器
文章图片
3. 使用mqtt.fx对接EMQ-X 配置 对接信息配置如下:
LiteOS云端对接教程03-LiteOS基于MQTT对接EMQ-X服务器
文章图片
其中ClientID随机生成一个即可。
订阅主题 使用mqtt.fx连接客户端,订阅本次实验中的两个主题:
  • 主题led_cmd:用于发布控制命令
  • 主题lightness:用于上报亮度
LiteOS云端对接教程03-LiteOS基于MQTT对接EMQ-X服务器
文章图片
4. 上云实验 编写实验文件 在 Demo 文件夹下创建cloud_test_demo文件夹,在其中创建emqx_mqtt_demo.c文件。
编写代码:
#include #include #include #define DEFAULT_LIFETIME60 #define DEFAULT_SERVER_IPV4"122.51.89.94" #define DEFAULT_SERVER_PORT1883 #define CN_MQTT_EP_CLIENTID"emqx-test-001" #define CN_MQTT_EP_USERNAME"mculover666" #define CN_MQTT_EP_PASSWD"123456789" #define CN_MQTT_EP_SUB_TOPIC1"led_cmd" #define CN_MQTT_EP_PUB_TOPIC1"lightness"#define recv_buf_len 100 static char recv_buffer[recv_buf_len]; //下发数据接收缓冲区 static intrecv_datalen; //表示接收数据长度osal_semp_t recv_sync; //命令接收回调函数和处理函数之间的信号量char lightness_buf[10]; static void mqtt_al_msg_dealer(void *arg,mqtt_al_msgrcv_t *msg) { if((msg->msg.len) < recv_buf_len) { //保存数据 memcpy(recv_buffer,msg->msg.data,msg->msg.len ); recv_buffer[msg->msg.len] = '\0'; recv_datalen = msg->msg.len; printf("recv buf: %s.\r\n", recv_buffer); //释放信号量,交由数据处理线程进行处理 osal_semp_post(recv_sync); } else { printf("recv buf is too small, len = %d.\r\n", msg->msg.len); } }static int task_recv_cmd_entry(void *args) { while(1) { /* 阻塞等待信号量 */ osal_semp_pend(recv_sync,cn_osal_timeout_forever); if(strstr(recv_buffer, "on")) { printf("-----------------LED ON !!! --------------------\r\n"); } else if(strstr(recv_buffer, "off")) { printf("-----------------LED OFF !!! --------------------\r\n"); } } return 0; }static int task_report_msg_entry(void *args) { int ret = -1; void *handle = NULL; mqtt_al_conpara_t config; mqtt_al_string_t str_temp; mqtt_al_subpara_t subpara_led_cmd; mqtt_al_pubpara_t pubpara_lightness; int lightness_value = https://www.it610.com/article/0; /* 配置结构体 */ str_temp.data = DEFAULT_SERVER_IPV4; str_temp.len= sizeof(DEFAULT_SERVER_IPV4); config.serveraddr = str_temp; config.serverport = DEFAULT_SERVER_PORT; config.security= en_mqtt_al_security_none; config.version= en_mqtt_al_version_3_1_0; str_temp.data = CN_MQTT_EP_CLIENTID; str_temp.len= sizeof(CN_MQTT_EP_CLIENTID); config.clientid= str_temp; str_temp.data = CN_MQTT_EP_USERNAME; str_temp.len= sizeof(CN_MQTT_EP_USERNAME); config.user= str_temp; str_temp.data = CN_MQTT_EP_PASSWD; str_temp.len= sizeof(CN_MQTT_EP_PASSWD); config.passwd= str_temp; config.cleansession = 1; config.willmsg= NULL; config.keepalivetime = DEFAULT_LIFETIME; config.timeout= 30; /* 配置并连接服务器 */ handle = mqtt_al_connect(&config); if(handle == NULL) { /* 连接出错 */ printf("config error.\r\n"); return -1; } else { /* 进一步检查服务器返回值 */ if(config.conret != cn_mqtt_al_con_code_ok) { /* 服务器返回值出错 */ printf("server return error, conret = %d.\r\n", config.conret); return -1; } else { printf("connect to server success.\r\n"); } }/* 连接成功后,订阅led_cmd主题消息 */ str_temp.data = https://www.it610.com/article/CN_MQTT_EP_SUB_TOPIC1; str_temp.len= sizeof(CN_MQTT_EP_SUB_TOPIC1); subpara_led_cmd.topic = str_temp; subpara_led_cmd.qos = en_mqtt_al_qos_0; subpara_led_cmd.dealer = mqtt_al_msg_dealer; subpara_led_cmd.arg = NULL; subpara_led_cmd.timeout = 60; ret =mqtt_al_subscribe(handle, &subpara_led_cmd); if(ret < 0) { printf("sub topic %s fail.\r\n", subpara_led_cmd.topic.data); return -1; } else { /* 进一步判断是否订阅成功 */ if(cn_mqtt_al_con_code_ok != subpara_led_cmd.subret) { printf("sub topic %s fail, subret = %d.\r\n", subpara_led_cmd.topic.data, subpara_led_cmd.subret); return -1; } else { printf("sub topic %s success.\r\n", subpara_led_cmd.topic.data); } }/* 每隔10s上报一次数据 */ str_temp.data = https://www.it610.com/article/CN_MQTT_EP_PUB_TOPIC1; str_temp.len= sizeof(CN_MQTT_EP_PUB_TOPIC1); pubpara_lightness.topic = str_temp; pubpara_lightness.qos = en_mqtt_al_qos_0; pubpara_lightness.retain = 0; pubpara_lightness.timeout = 30; while(1) { sprintf(lightness_buf,"%d", lightness_value); str_temp.data = https://www.it610.com/article/lightness_buf; str_temp.len= strlen(lightness_buf); pubpara_lightness.msg = str_temp; ret = mqtt_al_publish(handle, &pubpara_lightness); if(ret < 0) { printf("publish topic %s fail.\r\n", pubpara_lightness.topic.data); return -1; } else { printf("publish topic %s success. payload = %s, lightness = %d.\r\n", pubpara_lightness.topic.data, pubpara_lightness.msg.data, lightness_value); } lightness_value++; osal_task_sleep(10*1000); } }int standard_app_demo_main() { /* 创建信号量 */ osal_semp_create(&recv_sync,1,0); /* 创建任务 */ osal_task_create("task_reportmsg",task_report_msg_entry,NULL,0x800,NULL,8); osal_task_create("task_recv_cmd",task_recv_cmd_entry,NULL,0x400,NULL,8); return 0; }

添加路径 在user_demo.mk中添加如下:
#example for emqx_mqtt_demo ifeq ($(CONFIG_USER_DEMO), "emqx_mqtt_demo") user_demo_src= https://www.it610.com/article/${wildcard $(TOP_DIR)/targets/STM32L431_BearPi/Demos/cloud_test_demo/emqx_mqtt_demo.c} endif

添加位置如下:
LiteOS云端对接教程03-LiteOS基于MQTT对接EMQ-X服务器
文章图片
配置.sdkconfig LiteOS云端对接教程03-LiteOS基于MQTT对接EMQ-X服务器
文章图片
特别说明:实验时需要关闭shell组件,否则会因动态内存分配失败而无法连接。
数据上报实验结果 编译下载之后,可以在串口助手中看到输出信息:
LiteOS云端对接教程03-LiteOS基于MQTT对接EMQ-X服务器
文章图片
在订阅了该主题的客户端也可以看到上报数据:
LiteOS云端对接教程03-LiteOS基于MQTT对接EMQ-X服务器
文章图片
命令下发实验结果 在mqtt.fx中下发一条开启命令:
LiteOS云端对接教程03-LiteOS基于MQTT对接EMQ-X服务器
文章图片
可以看到设备后作出回应:
LiteOS云端对接教程03-LiteOS基于MQTT对接EMQ-X服务器
文章图片
再下发一条关闭命令:
LiteOS云端对接教程03-LiteOS基于MQTT对接EMQ-X服务器
文章图片
【LiteOS云端对接教程03-LiteOS基于MQTT对接EMQ-X服务器】可以看到设备后作出回应:
LiteOS云端对接教程03-LiteOS基于MQTT对接EMQ-X服务器
文章图片

    推荐阅读