MQTT 在 ESP32-C3 上的使用方法
1. 前提知识?
1. MQTT 是什么?
MQTT(Message Queuing Telemetry Transport)是一种轻量级的消息传输协议,用于在客户端和服务器之间进行消息传递。MQTT 协议基于发布 - 订阅模式,客户端可以订阅感兴趣的主题,服务器会将消息发布到这些主题上。MQTT 协议具有以下特点:
- 轻量级:MQTT 协议的消息头非常小,只有 2 字节,因此可以在低带宽环境下使用。
- 可靠:MQTT 协议支持 QoS(服务质量)级别,QoS 级别可以保证消息的可靠传递。
- 简单:MQTT 协议非常简单,易于实现和使用。
- 跨平台:MQTT 协议可以在不同的平台上使用,包括 Windows、Linux、Android、iOS 等。
- 安全性:MQTT 协议支持 SSL/TLS 加密,因此可以保证消息的安全性。
- 可扩展性:MQTT 协议支持扩展,用户可以根据自己的需求进行扩展。
2. MQTT需要一个 Broker
MQTT 协议需要一个 Broker(代理服务器)来进行消息的传递。Broker 是一个中间件,用于接收客户端的连接请求,并且将消息传递给订阅了该主题的客户端。Broker 可以是一个独立的服务器,也可以是一个云服务。
推荐使用 EMQX,一个开源的 MQTT Broker,它支持多种协议,包括 MQTT、MQTT-SN、CoAP 等,并且具有高性能、高可用性和易扩展性等特点。EMQX 还提供了丰富的功能,例如消息持久化、规则引擎、数据可视化等。
3. MQTT Broker 相关配置
在使用 MQTT Broker 时,需要进行一些配置,客户端连接时,必须要提供以下信息:
- 配置 Broker 的地址和端口。 (必需的)
- 配置 Broker 的用户名和密码。(如果开启了客户端认证,那就是必需的)
- 连接 MQTT Broker 的客户端 ID。(必需的)
2. ESP-C3 上的 开发Mqtt-client
2.1 需要引入mqtt库
在 CMakeLists.txt
中添加以下代码:
idf_component_register(SRCS "mqtt_example.c"
INCLUDE_DIRS "."
REQUIRES esp_event esp_log mqtt)
在 REQUIRES
中添加 mqtt
库,这样就可以在代码中使用 mqtt
库了。
2.2 引入相应的头文件
#include <stdio.h>
#include <string.h>
#include <freertos/FreeRTOS.h>
#include <freertos/task.h>
#include <esp_system.h>
#include <esp_event.h>
#include <esp_log.h>
#include "mqtt_client.h"
2.3 配置 MQTT Client 参数
static void mqtt_app_start(void)
{
esp_mqtt_client_config_t mqtt_cfg = {
.broker.address.uri = CONFIG_BROKER_URL,
.broker.address.port = 1883,
.credentials.client_id = "esp32-ABCD",
.credentials.username = "mqtt",
.credentials.authentication.password = "mqtt",
};
client = esp_mqtt_client_init(&mqtt_cfg);
}
在这个函数中,要指定要连接的 MQTT Broker 的地址和端口,以及要连接的客户端 ID、用户名和密码。 最终得到一个 esp_mqtt_client_handle_t client
对象,这个对象可以用来连接 MQTT Broker。
为了在其它函数中,可以使用这个对象,我们需要将这个对象声明为全局变量。
2.4 设置 MQTT 事件回调函数
// 具体的回调函数
static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data)
{
esp_mqtt_event_handle_t event = event_data;
esp_mqtt_client_handle_t client = event->client; // 客户端句柄
int msg_id;
switch ((esp_mqtt_event_id_t)event_id)
{
case MQTT_EVENT_CONNECTED: // mqtt 连接成功
ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED");
msg_id = esp_mqtt_client_publish(client, "/topic/qos1", "data_3", 0, 1, 0);
ESP_LOGI(TAG, "sent publish successful, msg_id=%d", msg_id);
msg_id = esp_mqtt_client_subscribe(client, "/topic/qos0", 0);
ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id);
msg_id = esp_mqtt_client_subscribe(client, "/topic/qos1", 1);
ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id);
msg_id = esp_mqtt_client_unsubscribe(client, "/topic/qos1");
ESP_LOGI(TAG, "sent unsubscribe successful, msg_id=%d", msg_id);
break;
case MQTT_EVENT_DISCONNECTED: // mqtt 断开连接
ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED");
break;
case MQTT_EVENT_SUBSCRIBED: // 订阅成功
ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id);
msg_id = esp_mqtt_client_publish(client, "/topic/qos0", "data", 0, 0, 0);
ESP_LOGI(TAG, "sent publish successful, msg_id=%d", msg_id);
break;
case MQTT_EVENT_UNSUBSCRIBED: // 取消订阅成功
ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id);
break;
case MQTT_EVENT_PUBLISHED: // 发布成功
ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id);
break;
case MQTT_EVENT_DATA: // 收到消息
ESP_LOGI(TAG, "MQTT_EVENT_DATA");
printf("TOPIC=%.*s\r\n", event->topic_len, event->topic);
printf("DATA=%.*s\r\n", event->data_len, event->data);
break;
case MQTT_EVENT_ERROR:
ESP_LOGI(TAG, "MQTT_EVENT_ERROR");
if (event->error_handle->error_type == MQTT_ERROR_TYPE_TCP_TRANSPORT)
{
log_error_if_nonzero("reported from esp-tls", event->error_handle->esp_tls_last_esp_err);
log_error_if_nonzero("reported from tls stack", event->error_handle->esp_tls_stack_err);
log_error_if_nonzero("captured as transport's socket errno", event->error_handle->esp_transport_sock_errno);
ESP_LOGI(TAG, "Last errno string (%s)", strerror(event->error_handle->esp_transport_sock_errno));
}
break;
default:
ESP_LOGI(TAG, "Other event id:%d", event->event_id);
break;
}
}
2.5 注册 MQTT 事件回调函数
static void mqtt_register_event(void)
{
esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, client);
}
2.6 启动 MQTT Client
static void mqtt_app_start(void)
{
esp_mqtt_client_start(client);
}
启动后,会自动连接到 MQTT Broker,然后会调用回调函数,回调函数会根据事件,调用相应的函数。而且会自动重连。无论断开多长时间,都会自动重连。
2.7 停止 MQTT Client
static void mqtt_app_stop(void)
{
esp_mqtt_client_stop(client);
}
停止后,会自动断开连接,然后会调用回调函数,回调函数会根据事件,调用相应的函数。此时,自动重连也会停止。
2.8 断开 MQTT Client 连接 Borker
static void mqtt_app_disconnect(void)
{
esp_mqtt_client_disconnect(client);
}
断开后,会立刻断开连接,同时触发相应的事件。但过一会儿,会自动重新连
接。
2.9 连接 MQTT Broker
static void mqtt_app_connect(void)
{
esp_mqtt_client_connect(client);
}
主动连接 Broker,会立刻连接 Broker,同时触发相应的事件。但是必须是在 esp_mqtt_client_start()
之后调用,也就是必须要开始之后。
2.10 发布消息
static void mqtt_app_publish(void)
{
esp_mqtt_client_publish(client, "/topic/qos0", "data", 0, 0, 0);
}
2.11 获取消息
消息的获取,不是使用一个函数,而是在回调中获取。即:mqtt_event_handler
函数中的MQTT_EVENT_DATA
事件中的 event->data
。
2.12 订阅主题
static void mqtt_app_subscribe(void)
{
esp_mqtt_client_subscribe(client, "/topic/qos0", 0);
}
2.13 取消订阅主题
static void mqtt_app_unsubscribe(void)
{
esp_mqtt_client_unsubscribe(client, "/topic/qos0");
}
3. 完整代码
/* MQTT (over TCP) Example
This example code is in the Public Domain (or CC0 licensed, at your option.)
Unless required by applicable law or agreed to in writing, this
software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
CONDITIONS OF ANY KIND, either express or implied.
*/
#include <stdio.h>
#include <stdint.h>
#include <string.h>
#include <stdlib.h>
#include <inttypes.h>
#include "esp_system.h"
#include "nvs_flash.h"
#include "esp_event.h"
#include "esp_netif.h"
#include "protocol_examples_common.h"
#include "esp_log.h"
#include "mqtt_client.h"
static const char *TAG = "mqtt_example";
esp_mqtt_client_handle_t client;
static void log_error_if_nonzero(const char *message, int error_code)
{
if (error_code != 0)
{
ESP_LOGE(TAG, "Last error %s: 0x%x", message, error_code);
}
}
/*
* @brief Event handler registered to receive MQTT events
*
* This function is called by the MQTT client event loop.
*
* @param handler_args user data registered to the event.
* @param base Event base for the handler(always MQTT Base in this example).
* @param event_id The id for the received event.
* @param event_data The data for the event, esp_mqtt_event_handle_t.
*/
static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data)
{
ESP_LOGD(TAG, "Event dispatched from event loop base=%s, event_id=%" PRIi32 "", base, event_id);
esp_mqtt_event_handle_t event = event_data;
esp_mqtt_client_handle_t client = event->client; // 客户端句柄
int msg_id;
switch ((esp_mqtt_event_id_t)event_id)
{
case MQTT_EVENT_CONNECTED: // mqtt 连接成功
ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED");
msg_id = esp_mqtt_client_publish(client, "/topic/qos1", "data_3", 0, 1, 0);
ESP_LOGI(TAG, "sent publish successful, msg_id=%d", msg_id);
msg_id = esp_mqtt_client_subscribe(client, "/topic/qos0", 0);
ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id);
msg_id = esp_mqtt_client_subscribe(client, "/topic/qos1", 1);
ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id);
msg_id = esp_mqtt_client_unsubscribe(client, "/topic/qos1");
ESP_LOGI(TAG, "sent unsubscribe successful, msg_id=%d", msg_id);
break;
case MQTT_EVENT_DISCONNECTED: // mqtt 断开连接
ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED");
break;
case MQTT_EVENT_SUBSCRIBED: // 订阅成功
ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id);
msg_id = esp_mqtt_client_publish(client, "/topic/qos0", "data", 0, 0, 0);
ESP_LOGI(TAG, "sent publish successful, msg_id=%d", msg_id);
break;
case MQTT_EVENT_UNSUBSCRIBED: // 取消订阅成功
ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id);
break;
case MQTT_EVENT_PUBLISHED: // 发布成功
ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id);
break;
case MQTT_EVENT_DATA: // 收到消息
ESP_LOGI(TAG, "MQTT_EVENT_DATA");
printf("TOPIC=%.*s\r\n", event->topic_len, event->topic);
printf("DATA=%.*s\r\n", event->data_len, event->data);
break;
case MQTT_EVENT_ERROR: // 错误处理,断开连接,或是服务器掉线会触发这个消息
ESP_LOGI(TAG, "MQTT_EVENT_ERROR");
if (event->error_handle->error_type == MQTT_ERROR_TYPE_TCP_TRANSPORT)
{
log_error_if_nonzero("reported from esp-tls", event->error_handle->esp_tls_last_esp_err);
log_error_if_nonzero("reported from tls stack", event->error_handle->esp_tls_stack_err);
log_error_if_nonzero("captured as transport's socket errno", event->error_handle->esp_transport_sock_errno);
ESP_LOGI(TAG, "Last errno string (%s)", strerror(event->error_handle->esp_transport_sock_errno));
}
break;
default:
ESP_LOGI(TAG, "Other event id:%d", event->event_id);
break;
}
}
static void mqtt_app_start(void)
{
esp_mqtt_client_config_t mqtt_cfg = {
.broker.address.uri = CONFIG_BROKER_URL,
.broker.address.port = 1883,
.credentials.client_id = "esp32-ABCD",
.credentials.username = "mqtt",
.credentials.authentication.password = "mqtt",
};
#if CONFIG_BROKER_URL_FROM_STDIN
char line[128];
if (strcmp(mqtt_cfg.broker.address.uri, "FROM_STDIN") == 0)
{
int count = 0;
printf("Please enter url of mqtt broker\n");
while (count < 128)
{
int c = fgetc(stdin);
if (c == '\n')
{
line[count] = '\0';
break;
}
else if (c > 0 && c < 127)
{
line[count] = c;
++count;
}
vTaskDelay(10 / portTICK_PERIOD_MS);
}
mqtt_cfg.broker.address.uri = line;
printf("Broker url: %s\n", line);
}
else
{
ESP_LOGE(TAG, "Configuration mismatch: wrong broker url");
abort();
}
#endif /* CONFIG_BROKER_URL_FROM_STDIN */
// esp_mqtt_client_handle_t client = esp_mqtt_client_init(&mqtt_cfg);
client = esp_mqtt_client_init(&mqtt_cfg);
/* The last argument may be used to pass data to the event handler, in this example mqtt_event_handler */
esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, NULL);
esp_mqtt_client_start(client);
}
void app_mqtt_stop(void)
{
esp_mqtt_client_stop(client);
}
void app_main(void)
{
ESP_LOGI(TAG, "[APP] Startup..");
ESP_LOGI(TAG, "[APP] Free memory: %" PRIu32 " bytes", esp_get_free_heap_size());
ESP_LOGI(TAG, "[APP] IDF version: %s", esp_get_idf_version());
// esp_log_level_set("*", ESP_LOG_INFO); // 设置日志等级
// esp_log_level_set("mqtt_client", ESP_LOG_VERBOSE); // 设置mqtt日志等级
// esp_log_level_set("mqtt_example", ESP_LOG_VERBOSE);
// esp_log_level_set("transport_base", ESP_LOG_VERBOSE);
// esp_log_level_set("esp-tls", ESP_LOG_VERBOSE);
// esp_log_level_set("transport", ESP_LOG_VERBOSE);
// esp_log_level_set("outbox", ESP_LOG_VERBOSE);
ESP_ERROR_CHECK(nvs_flash_init());
ESP_ERROR_CHECK(esp_netif_init());
ESP_ERROR_CHECK(esp_event_loop_create_default());
/* This helper function configures Wi-Fi or Ethernet, as selected in menuconfig.
* Read "Establishing Wi-Fi or Ethernet Connection" section in
* examples/protocols/README.md for more information about this function.
*/
ESP_ERROR_CHECK(example_connect());
/* Start the MQTT client */
ESP_LOGI(TAG, "Start connect MQTT");
mqtt_app_start();
vTaskDelay(60000 / portTICK_PERIOD_MS);
ESP_LOGI(TAG, "Disconnecting MQTT");
app_mqtt_stop();
}