先上官方文档

参考文献

一,MQTT协议的简介

网上很多这里不作讲解,参考这篇文章

二,ESP_MQTT的配置过程

ESP_MQTT实现了MQTT客户端的功能,支持多种协议,这里仅以TCp为例。基本步骤如下;

  1. 使用esp_mqtt_client_init()配置mqtt。
  2. 使用esp_mqtt_client_register_event()注册事件处理函数。
  3. 使用esp_mqtt_client_start()开启mqtt客户端。

MQTT的运行与WiFi类似,都是基于事件进行处理。但是,WiFi库是基于ESP_EVENT库而MQTT库则是使用的内建的事件处理循环。

初始化MQTT

1
esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *config);

使用该函数初始化MQTT客户端,并创建MQTT客户端句柄。

  • esp_mqtt_client_config_t
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
typedef struct esp_mqtt_client_config_t {
/**
* Broker 相关设置
*/
struct broker_t {
/**
* Broker address
*
* - uri 优先于其他字段
* - 如果uri没有设置,至少hostname, transport and port 应该被设置.
*/
struct address_t {
const char *uri; /*!< 完整的 *MQTT* broker URI */
const char *hostname; /*!< Hostname, to set ipv4 pass it as string) */
esp_mqtt_transport_t transport; /*!< 选择协议类型*/
const char *path; /*!< Path in the URI*/
uint32_t port; /*!< *MQTT* server port */
} address; /*!< Broker地址设置 */
/**
* Broker 的身份验证
*
* 如果不设置这部分,将不会验证Broker的身份,为安全考虑建议设置验证
*/
struct verification_t {
bool use_global_ca_store; /*!< 是否使用全局 ca_store, 详见 esp-tls文档*/
esp_err_t (*crt_bundle_attach)(void *conf); /*!< Pointer to ESP x509 Certificate Bundle attach function for
the usage of certificate bundles. */
const char *certificate; /*!< 证书数据,默认为NULL,验证服务器时不需要. */
size_t certificate_len; /*!< 证书数据长度 */
const struct psk_key_hint *psk_hint_key; /*!< 指向esp_tls.h中定义的PSK结构的指针,以启用PSK 认证(作为证书验证的替代).
只有当没有其他验证方式时启用*/
bool skip_cert_common_name_check; /*!< 跳过对服务器证书CN字段的任何验证,这降低了TLS的安全性,使*MQTT*客户端容易受到MITM攻击 */
const char **alpn_protos; /*!< 用于ALPN的以NULL为结尾的支持应用协议列表 */
} verification; /*!< broker的身份验证 */
} broker; /*!< Broker 地址和验证信息 */
/**
* Client related credentials for authentication.
*/
struct credentials_t {
const char *username; /*!< *MQTT* 用户名*/
const char *client_id; /*!< 设置 *MQTT* 客户端ID. 如果 set_null_client_id == true 忽略. 如果是NULL则为默认设置. Default client id is ``ESP32_%CHIPID%`` where `%CHIPID%` are last 3 bytes of MAC address in hex format */
bool set_null_client_id; /*!< 用户ID是否为NULL */
/**
* 客户端验证
*
* 对于使用TLS的相互认证,用户可以选择证书和密钥,
* 安全认证信息(如果设置了).
*
*/
struct authentication_t {
const char *password; /*!< *MQTT* 密码 */
const char *certificate; /*!< ssl认证的证书, 需要与key一起提供.如果没有验证则不需要*/
size_t certificate_len; /*!< 证书的长度.*/
const char *key; /*!< ssl认证的私钥,需要与证书一起提供,如果没有验证则不需要*/
size_t key_len; /*!< key的长度.*/
const char *key_password; /*!< 客户端密钥解密密码,不是PEM也不是DER, 如果提供,len也必须正确提供. */
int key_password_len; /*!< `key_password`的长度 */
bool use_secure_element; /*!< 启用安全元素,在ESP32-ROOM-32SE中可用,用于SSL连接。 */
void *ds_data; /*!< 数字签名,数字签名外设在一些Espressif设备中是可用的。 */
} authentication; /*!< 用户验证信息 */
} credentials; /*!< 用户验证信息 */
/**
* *MQTT* 会话相关设置
*/
struct session_t {
/**
* 遗嘱设置.
*/
struct last_will_t {
const char *topic; /*!< 遗嘱消息的主题 */
const char *msg; /*!< 遗嘱消息,以NULL空字符结尾*/
int msg_len; /*!< 遗嘱长度,如果msg不以NULL结尾,需要设置正确长度 */
int qos; /*!< 遗嘱消息质量 */
int retain; /*!< 遗嘱保留标志 */
} last_will; /*!< 遗嘱设置 */
bool disable_clean_session; /*!< *MQTT* 持久会话设置, */
int keepalive; /*!< *MQTT* keepalive时间, 默认 120 seconds */
bool disable_keepalive; /*!< 设置 `disable_keepalive=true` 来关闭 keep-alive, keepalive 默认开启.
Note: 设置keepalive为0不会关闭keepalive,而是使用默认的时间长度 */
esp_mqtt_protocol_ver_t protocol_ver; /*!< *MQTT* 使用的协议版本.*/
int message_retransmit_timeout; /*!< 重传超时时间 */
} session; /*!< *MQTT* 会话设置. */
/**
* 网络相关设置
*/
struct network_t {
int reconnect_timeout_ms; /*!< 断开连接后重新自动连接到broker间隔的时间,默认为10s*/
int timeout_ms; /*!< 网络操作超时时间,默认10s. */
int refresh_connection_after_ms; /*!< 刷新连接事件间隔 */
bool disable_auto_reconnect; /*!< 设置为true关闭自动重连 */
} network; /*!< 网络设置 */
/**
* 客户端任务设置
*/
struct task_t {
int priority; /*!< *MQTT* 任务优先级*/
int stack_size; /*!< *MQTT* 任务栈大小*/
} task; /*!< FreeRTOS 任务设置.*/
/**
* Client 用户缓冲区大小设置
*
* Client 有两个缓冲区:输入和输出缓冲区
*/
struct buffer_t {
int size; /*!< size of *MQTT* send/receive buffer*/
int out_size; /*!< size of *MQTT* output buffer. If not defined, defaults to the size defined by
``buffer_size`` */
} buffer; /*!< Buffer size configuration.*/
} esp_mqtt_client_config_t;

  • 使用 uri 字段的格式为 scheme://hostname:port/path
  • 当前支持 mqttmqttswswss 协议

  • 基于 TCP 的 MQTT 示例:

    • mqtt://mqtt.eclipseprojects.io:基于 TCP 的 MQTT,默认端口 1883

    • mqtt://mqtt.eclipseprojects.io:1884:基于 TCP 的 MQTT,端口 1884

    • mqtt://username:password@mqtt.eclipseprojects.io:1884:基于 TCP 的 MQTT, 端口 1884,带有用户名和密码

  • 基于 SSL 的 MQTT 示例:

    • mqtts://mqtt.eclipseprojects.io:基于 SSL 的 MQTT,端口 8883

    • mqtts://mqtt.eclipseprojects.io:8884:基于 SSL 的 MQTT,端口 8884

  • 基于 WebSocket 的 MQTT 示例:

    • ws://mqtt.eclipseprojects.io:80/mqtt
  • 基于 WebSocket Secure 的 MQTT 示例:

    • wss://mqtt.eclipseprojects.io:443/mqtt
  • 客户端凭据
    credentials 字段下包含所有客户端相关凭据。

    • username:指向用于连接服务器用户名的指针,也可通过 URI 设置

    • client_id:指向客户端 ID 的指针,默认为 ESP32_%CHIPID%,其中 %CHIPID% 是十六进制 MAC 地址的最后 3 个字节

    认证
    可以通过 authentication 字段设置认证参数。客户端支持以下认证方式:

    • password:使用密码

    • certificatekey:进行双向 TLS 身份验证,PEM 或 DER 格式均可

    • use_secure_element:使用 ESP32-WROOM-32SE 中的安全元素

    • ds_data:使用某些乐鑫设备的数字签名外设

MQTT 协议中的 Keep Alive 机制

MQTT 协议是承载于 TCP 协议之上的,而 TCP 协议以连接为导向,在连接双方之间,提供稳定、有序的字节流功能。 但是,在部分情况下,TCP 可能出现半连接问题。所谓半连接,是指某一方的连接已经断开或者没有建立,而另外一方的连接却依然维持着。在这种情况下,半连接的一方可能会持续不断地向对端发送数据,而显然这些数据永远到达不了对端。为了避免半连接导致的通信黑洞,MQTT 协议提供了 Keep Alive 机制,使客户端和 MQTT 服务器可以判定当前是否存在半连接问题,从而关闭对应连接。

MQTT持久会话与Clean Session详解

MQTT客户端在发起服务器连接时,可以设置是否创建一个持久会话。但Clean Session为true,指在创建一个新的会话时,在客户端断开连接时会话会自动销毁,为false时,在客户端断开连接时会话仍然保持,直到其重新连接或者会话超时注销。

持久会话可以避免因网络中断导致的重复订阅或者错过离线期间的消息。

注册事件函数

1
esp_err_t esp_mqtt_client_register_event(esp_mqtt_client_handle_t client, esp_mqtt_event_id_t event, esp_event_handler_t event_handler, void *event_handler_arg)

使用该函数注册事件处理函数。该事件处理的用法与esp_event库相似。

一个标准的事件处理模板为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
/*
* @brief Event handler registered to receive MQTT events
*
* This function is called by the MQTT client event loop.
*
* @param handler_args user data registered to the event.
* @param base Event base for the handler(always MQTT Base in this example).
* @param event_id The id for the received event.
* @param event_data The data for the event, esp_mqtt_event_handle_t.
*/
static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data)
{
ESP_LOGD(TAG, "Event dispatched from event loop base=%s, event_id=%d", base, event_id);
esp_mqtt_event_handle_t event = event_data;
esp_mqtt_client_handle_t client = event->client;
int msg_id;
switch ((esp_mqtt_event_id_t)event_id) {
case MQTT_EVENT_CONNECTED:
ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED");
msg_id = esp_mqtt_client_publish(client, "/topic/qos1", "data_3", 0, 1, 0);
ESP_LOGI(TAG, "sent publish successful, msg_id=%d", msg_id);

msg_id = esp_mqtt_client_subscribe(client, "/topic/qos0", 0);
ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id);

msg_id = esp_mqtt_client_subscribe(client, "/topic/qos1", 1);
ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id);

msg_id = esp_mqtt_client_unsubscribe(client, "/topic/qos1");
ESP_LOGI(TAG, "sent unsubscribe successful, msg_id=%d", msg_id);
break;
case MQTT_EVENT_DISCONNECTED:
ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED");
break;

case MQTT_EVENT_SUBSCRIBED:
ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id);
msg_id = esp_mqtt_client_publish(client, "/topic/qos0", "data", 0, 0, 0);
ESP_LOGI(TAG, "sent publish successful, msg_id=%d", msg_id);
break;
case MQTT_EVENT_UNSUBSCRIBED:
ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id);
break;
case MQTT_EVENT_PUBLISHED:
ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id);
break;
case MQTT_EVENT_DATA:
ESP_LOGI(TAG, "MQTT_EVENT_DATA");
printf("TOPIC=%.*s\r\n", event->topic_len, event->topic);
printf("DATA=%.*s\r\n", event->data_len, event->data);
break;
case MQTT_EVENT_ERROR:
ESP_LOGI(TAG, "MQTT_EVENT_ERROR");
if (event->error_handle->error_type == MQTT_ERROR_TYPE_TCP_TRANSPORT) {
log_error_if_nonzero("reported from esp-tls", event->error_handle->esp_tls_last_esp_err);
log_error_if_nonzero("reported from tls stack", event->error_handle->esp_tls_stack_err);
log_error_if_nonzero("captured as transport's socket errno", event->error_handle->esp_transport_sock_errno);
ESP_LOGI(TAG, "Last errno string (%s)", strerror(event->error_handle->esp_transport_sock_errno));

}
break;
default:
ESP_LOGI(TAG, "Other event id:%d", event->event_id);
break;
}
}


//示例注册函数
esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, NULL);

开始MQTT客户端

1
esp_err_t esp_mqtt_client_start(esp_mqtt_client_handle_t client);

开启MQTT客户端服务。

订阅/取消订阅

1
int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, const char *topic, int qos)

将客户端订阅到定义的主题,并定义了QOS。需要在连接服务器后进行。

1
int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *topic)

取消订阅。

发布内容

ESP提供了两种发布内容的方法,阻塞式和非阻塞。

1
int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain);

阻塞式发布一个消息。超时时间为config中的网络超时时间。使用这个API不需要先连接到服务器,此时消息会自动加入后台队列(如果设置了MQTT_SKIP_PUBLISH_IF_DISCONNECTED为true,则不会尝试发送而是直接返回-1)

1
int esp_mqtt_client_enqueue(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain, bool store)

将信息排队到outbox,以便稍后发送通常用于qos>0的消息,但如果store=true,也可用于qos=0的消息

这个API生成并存储发布消息到内部的outbox中,而实际发送至网络是在mqtt任务中进行的(与esp_mqtt_client_publish()相反,后者在用户任务的任务中立即发送发布消息)。因此,它可以作为esp_mqtt_client_publish()的一个非阻塞版本。

QOS服务质量

  • QoS0,At most once,至多一次,Sender 发送的一条消息,Receiver 最多能收到一次,也就是说 Sender 尽力向 Receiver 发送消息,如果发送失败,也就算了;
  • QoS1,At least once,至少一次,Sender 发送的一条消息,Receiver 至少能收到一次,也就是说 Sender 向 Receiver 发送消息,如果发送失败,会继续重试,直到 Receiver 收到消息为止,但是因为重传的原因,Receiver 有可能会收到重复的消息;
  • QoS2,Exactly once,确保只有一次,Sender 发送的一条消息,Receiver 确保能收到而且只收到一次,也就是说 Sender 尽力向 Receiver 发送消息,如果发送失败,会继续重试,直到 Receiver 收到消息为止,同时保证 Receiver 不会因为消息重传而收到重复的消息。

当我们使用MQTT客户端发布消息(PUBLISH)时,如果将RETAIN标志位设置为true,那么MQTT服务器会将最近收到的一条RETAIN标志位为true的消息保存在服务器端(内存或文件)。
特别注意:MQTT服务器只会为每一个Topic保存最近收到的一条RETAIN标志位为true的消息!也就是说,如果MQTT服务器上已经为某个Topic保存了一条Retained消息,当客户端再次发布一条新的Retained消息,那么服务器上原来的那条消息会被覆盖!

每当MQTT客户端连接到MQTT服务器并订阅了某个topic,如果该topic下有Retained消息,那么MQTT服务器会立即向客户端推送该条Retained消息。

如果客户端想让MQTT服务器删除某个Topic下保存的Retained消息,唯一的方法是向MQTT服务器发布一条RETAIN标志位为true空消息
空消息即为发布消息(PUBLISH)的时候,Payload中设置0个字节的内容。
删除了某个Topic下保存的Retained消息,如果客户端没有再发布Retained消息,则MQTT服务器上对于该Topic就没有了Retained消息。

其他API

1
esp_err_t esp_mqtt_client_set_uri(esp_mqtt_client_handle_t client, const char *uri);

更新uri。

1
esp_err_t esp_mqtt_client_reconnect(esp_mqtt_client_handle_t client);

手动重新连接。

1
esp_err_t esp_mqtt_client_disconnect(esp_mqtt_client_handle_t client);

手动断连。

1
esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_client_config_t *config);

更新设置,在MQTT_EVENT_BEFORE_CONNECT前。

三,其他参考

事件参考

User event handler receives context data in esp_mqtt_event_t structure with

client - MQTT client handle

various other data depending on event type

  • MQTT_EVENT_ANY

  • MQTT_EVENT_ERROR

    在错误事件中,额外的上下文:连接返回代码,来自esp_tls的错误处理(如果支持)

  • MQTT_EVENT_CONNECTED

    connected event, additional context: session_present flag

  • MQTT_EVENT_DISCONNECTED

    disconnected event

  • MQTT_EVENT_SUBSCRIBED

    subscribed event, additional context:

    • msg_id message id

    • error_handle error_type in case subscribing failed

    • data pointer to broker response, check for errors.

    • data_len length of the data for this event

  • MQTT_EVENT_UNSUBSCRIBED

    unsubscribed event, additional context: msg_id

  • MQTT_EVENT_PUBLISHED

    published event, additional context: msg_id

  • MQTT_EVENT_DATA

    data event, additional context:

    • msg_id message id

    • topic pointer to the received topic

    • topic_len length of the topic

    • data pointer to the received data

    • data_len length of the data for this event

    • current_data_offset offset of the current data for this event

    • total_data_len total length of the data received

    • retain retain flag of the message

    • qos QoS level of the message

    • dup dup flag of the message Note: Multiple MQTT_EVENT_DATA could be fired for one message, if it is longer than internal buffer. In that case only first event contains topic pointer and length, other contain data only with current data length and current data offset updating.

  • MQTT_EVENT_BEFORE_CONNECT

    The event occurs before connecting

  • MQTT_EVENT_DELETED

    Notification on delete of one message from the internal outbox, if the message couldn’t have been sent and acknowledged before expiring defined in OUTBOX_EXPIRED_TIMEOUT_MS. (events are not posted upon deletion of successfully acknowledged messages)

    • This event id is posted only if MQTT_REPORT_DELETED_MESSAGES==1
    • Additional context: msg_id (id of the deleted message).
  • MQTT_USER_EVENT

    Custom event used to queue tasks into mqtt event handler All fields from the esp_mqtt_event_t type could be used to pass an additional context data to the handler.