feat(mqtt): update mqtt component to esp-mqtt commit id 752953dc and update some example

This commit is contained in:
yuanjm
2020-04-26 18:56:27 +08:00
parent f9bfd84744
commit 2882a6fac2
31 changed files with 696 additions and 357 deletions

View File

@@ -13,6 +13,8 @@ addons:
before_install:
# Save path to the git respository
- PROJECT_PATH=$(pwd)
# Have to checkout a temp branch for later in tree reference
- git checkout -b temporary_ref_branch
- CI_COMMIT_SHA=$(git rev-parse HEAD)
install:

View File

@@ -45,6 +45,7 @@ typedef enum {
- current_data_offset offset of the current data for this event
- total_data_len total length of the data received
*/
MQTT_EVENT_BEFORE_CONNECT, /*!< The event occurs before connecting */
} esp_mqtt_event_id_t;
typedef enum {
@@ -103,16 +104,19 @@ typedef struct {
const char *client_cert_pem; /*!< Pointer to certificate data in PEM format for SSL mutual authentication, default is NULL, not required if mutual authentication is not needed. If it is not NULL, also `client_key_pem` has to be provided. */
const char *client_key_pem; /*!< Pointer to private key data in PEM format for SSL mutual authentication, default is NULL, not required if mutual authentication is not needed. If it is not NULL, also `client_cert_pem` has to be provided. */
esp_mqtt_transport_t transport; /*!< overrides URI transport */
int refresh_connection_after_ms; /*!< Refresh connection after this value (in milliseconds) */
} esp_mqtt_client_config_t;
esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *config);
esp_err_t esp_mqtt_client_set_uri(esp_mqtt_client_handle_t client, const char *uri);
esp_err_t esp_mqtt_client_start(esp_mqtt_client_handle_t client);
esp_err_t esp_mqtt_client_reconnect(esp_mqtt_client_handle_t client);
esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client);
esp_err_t esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, const char *topic, int qos);
esp_err_t esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *topic);
int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain);
esp_err_t esp_mqtt_client_destroy(esp_mqtt_client_handle_t client);
esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_client_config_t *config);
#ifdef __cplusplus
}

View File

@@ -10,6 +10,7 @@
#define MQTT_PROTOCOL_311 CONFIG_MQTT_PROTOCOL_311
#define MQTT_RECONNECT_TIMEOUT_MS (10*1000)
#define MQTT_POLL_READ_TIMEOUT_MS (1000)
#if CONFIG_MQTT_BUFFER_SIZE
#define MQTT_BUFFER_SIZE_BYTE CONFIG_MQTT_BUFFER_SIZE
@@ -61,14 +62,18 @@
#define MQTT_CORE_SELECTION_ENABLED CONFIG_MQTT_TASK_CORE_SELECTION_ENABLED
#ifdef CONFIG_MQTT_DISABLE_API_LOCKS
#define MQTT_DISABLE_API_LOCKS CONFIG_MQTT_DISABLE_API_LOCKS
#endif
#ifdef CONFIG_MQTT_USE_CORE_0
#define MQTT_TASK_CORE 0
#define MQTT_TASK_CORE 0
#else
#ifdef CONFIG_MQTT_USE_CORE_1
#define MQTT_TASK_CORE 1
#else
#define MQTT_TASK_CORE 0
#endif
#ifdef CONFIG_MQTT_USE_CORE_1
#define MQTT_TASK_CORE 1
#else
#define MQTT_TASK_CORE 0
#endif
#endif

View File

@@ -72,7 +72,8 @@ typedef struct mqtt_message
{
uint8_t* data;
uint32_t length;
uint32_t fragmented_msg_total_length; /*!< total len of fragmented messages (zero for all other messages) */
uint32_t fragmented_msg_data_offset; /*!< data offset of fragmented messages (zero for all other messages) */
} mqtt_message_t;
typedef struct mqtt_connection
@@ -105,6 +106,7 @@ static inline int mqtt_get_type(uint8_t* buffer) { return (buffer[0] & 0xf0) >>
static inline int mqtt_get_connect_session_present(uint8_t* buffer) { return buffer[2] & 0x01; }
static inline int mqtt_get_connect_return_code(uint8_t* buffer) { return buffer[3]; }
static inline int mqtt_get_dup(uint8_t* buffer) { return (buffer[0] & 0x08) >> 3; }
static inline void mqtt_set_dup(uint8_t* buffer) { buffer[0] |= 0x08; }
static inline int mqtt_get_qos(uint8_t* buffer) { return (buffer[0] & 0x06) >> 1; }
static inline int mqtt_get_retain(uint8_t* buffer) { return (buffer[0] & 0x01); }

View File

@@ -15,17 +15,35 @@ struct outbox_item;
typedef struct outbox_list_t * outbox_handle_t;
typedef struct outbox_item * outbox_item_handle_t;
typedef struct outbox_message * outbox_message_handle_t;
typedef struct outbox_message {
uint8_t *data;
int len;
int msg_id;
int msg_qos;
int msg_type;
uint8_t *remaining_data;
int remaining_len;
} outbox_message_t;
typedef enum pending_state {
QUEUED,
TRANSMITTED,
CONFIRMED
} pending_state_t;
outbox_handle_t outbox_init();
outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, uint8_t *data, int len, int msg_id, int msg_type, int tick);
outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox);
outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handle_t message, int tick);
outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pending);
outbox_item_handle_t outbox_get(outbox_handle_t outbox, int msg_id);
uint8_t* outbox_item_get_data(outbox_item_handle_t item, size_t *len, uint16_t *msg_id, int *msg_type, int *qos);
esp_err_t outbox_delete(outbox_handle_t outbox, int msg_id, int msg_type);
esp_err_t outbox_delete_msgid(outbox_handle_t outbox, int msg_id);
esp_err_t outbox_delete_msgtype(outbox_handle_t outbox, int msg_type);
esp_err_t outbox_delete_expired(outbox_handle_t outbox, int current_tick, int timeout);
esp_err_t outbox_set_pending(outbox_handle_t outbox, int msg_id);
esp_err_t outbox_set_pending(outbox_handle_t outbox, int msg_id, pending_state_t pending);
int outbox_get_size(outbox_handle_t outbox);
esp_err_t outbox_cleanup(outbox_handle_t outbox, int max_size);
void outbox_destroy(outbox_handle_t outbox);

View File

@@ -34,7 +34,7 @@
#include "mqtt_config.h"
#include "platform.h"
#define MQTT_MAX_FIXED_HEADER_SIZE 3
#define MQTT_MAX_FIXED_HEADER_SIZE 5
enum mqtt_connect_flag
{
@@ -105,22 +105,42 @@ static mqtt_message_t* fail_message(mqtt_connection_t* connection)
static mqtt_message_t* fini_message(mqtt_connection_t* connection, int type, int dup, int qos, int retain)
{
int remaining_length = connection->message.length - MQTT_MAX_FIXED_HEADER_SIZE;
if (remaining_length > 127)
{
connection->buffer[0] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1);
connection->buffer[1] = 0x80 | (remaining_length % 128);
connection->buffer[2] = remaining_length / 128;
connection->message.length = remaining_length + 3;
connection->message.data = connection->buffer;
int message_length = connection->message.length - MQTT_MAX_FIXED_HEADER_SIZE;
int total_length = message_length;
int encoded_length = 0;
uint8_t encoded_lens[4] = {0};
// Check if we have fragmented message and update total_len
if (connection->message.fragmented_msg_total_length) {
total_length = connection->message.fragmented_msg_total_length - MQTT_MAX_FIXED_HEADER_SIZE;
}
else
{
connection->buffer[1] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1);
connection->buffer[2] = remaining_length;
connection->message.length = remaining_length + 2;
connection->message.data = connection->buffer + 1;
// Encode MQTT message length
int len_bytes = 0; // size of encoded message length
do {
encoded_length = total_length % 128;
total_length /= 128;
if (total_length > 0) {
encoded_length |= 0x80;
}
encoded_lens[len_bytes] = encoded_length;
len_bytes++;
} while (total_length > 0);
// Sanity check for MQTT header
if (len_bytes + 1 > MQTT_MAX_FIXED_HEADER_SIZE) {
return fail_message(connection);
}
// Save the header bytes
connection->message.length = message_length + len_bytes + 1; // msg len + encoded_size len + type (1 byte)
int offs = MQTT_MAX_FIXED_HEADER_SIZE - 1 - len_bytes;
connection->message.data = connection->buffer + offs;
connection->message.fragmented_msg_data_offset -= offs;
// type byte
connection->buffer[offs++] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1);
// length bytes
for (int j = 0; j<len_bytes; j++) {
connection->buffer[offs++] = encoded_lens[j];
}
return &connection->message;
@@ -377,11 +397,17 @@ mqtt_message_t* mqtt_msg_publish(mqtt_connection_t* connection, const char* topi
else
*message_id = 0;
if (connection->message.length + data_length > connection->buffer_length)
return fail_message(connection);
memcpy(connection->buffer + connection->message.length, data, data_length);
connection->message.length += data_length;
if (connection->message.length + data_length > connection->buffer_length) {
// Not enough size in buffer -> fragment this message
connection->message.fragmented_msg_data_offset = connection->message.length;
memcpy(connection->buffer + connection->message.length, data, connection->buffer_length - connection->message.length);
connection->message.length = connection->buffer_length;
connection->message.fragmented_msg_total_length = data_length + connection->message.fragmented_msg_data_offset;
} else {
memcpy(connection->buffer + connection->message.length, data, data_length);
connection->message.length += data_length;
connection->message.fragmented_msg_total_length = 0;
}
return fini_message(connection, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain);
}

View File

@@ -14,9 +14,10 @@ typedef struct outbox_item {
int len;
int msg_id;
int msg_type;
int msg_qos;
int tick;
int retry_count;
bool pending;
pending_state_t pending;
STAILQ_ENTRY(outbox_item) next;
} outbox_item_t;
@@ -31,22 +32,27 @@ outbox_handle_t outbox_init()
return outbox;
}
outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, uint8_t *data, int len, int msg_id, int msg_type, int tick)
outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handle_t message, int tick)
{
outbox_item_handle_t item = calloc(1, sizeof(outbox_item_t));
ESP_MEM_CHECK(TAG, item, return NULL);
item->msg_id = msg_id;
item->msg_type = msg_type;
item->msg_id = message->msg_id;
item->msg_type = message->msg_type;
item->msg_qos = message->msg_qos;
item->tick = tick;
item->len = len;
item->buffer = malloc(len);
item->len = message->len;
item->pending = QUEUED;
item->buffer = malloc(message->len + message->remaining_len);
ESP_MEM_CHECK(TAG, item->buffer, {
free(item);
return NULL;
});
memcpy(item->buffer, data, len);
memcpy(item->buffer, message->data, message->len);
if (message->remaining_data) {
memcpy(item->buffer+message->len, message->remaining_data, message->remaining_len);
}
STAILQ_INSERT_TAIL(outbox, item, next);
ESP_LOGD(TAG, "ENQUEUE msgid=%d, msg_type=%d, len=%d, size=%d", msg_id, msg_type, len, outbox_get_size(outbox));
ESP_LOGD(TAG, "ENQUEUE msgid=%d, msg_type=%d, len=%d, size=%d", message->msg_id, message->msg_type, message->len + message->remaining_len, outbox_get_size(outbox));
return item;
}
@@ -61,21 +67,34 @@ outbox_item_handle_t outbox_get(outbox_handle_t outbox, int msg_id)
return NULL;
}
outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox)
outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pending)
{
outbox_item_handle_t item;
STAILQ_FOREACH(item, outbox, next) {
if (!item->pending) {
if (item->pending == pending) {
return item;
}
}
return NULL;
}
uint8_t* outbox_item_get_data(outbox_item_handle_t item, size_t *len, uint16_t *msg_id, int *msg_type, int *qos)
{
if (item) {
*len = item->len;
*msg_id = item->msg_id;
*msg_type = item->msg_type;
*qos = item->msg_qos;
return (uint8_t*)item->buffer;
}
return NULL;
}
esp_err_t outbox_delete(outbox_handle_t outbox, int msg_id, int msg_type)
{
outbox_item_handle_t item, tmp;
STAILQ_FOREACH_SAFE(item, outbox, next, tmp) {
if (item->msg_id == msg_id && item->msg_type == msg_type) {
if (item->msg_id == msg_id && (0xFF&(item->msg_type)) == msg_type) {
STAILQ_REMOVE(outbox, item, outbox_item, next);
free(item->buffer);
free(item);
@@ -99,11 +118,11 @@ esp_err_t outbox_delete_msgid(outbox_handle_t outbox, int msg_id)
}
return ESP_OK;
}
esp_err_t outbox_set_pending(outbox_handle_t outbox, int msg_id)
esp_err_t outbox_set_pending(outbox_handle_t outbox, int msg_id, pending_state_t pending)
{
outbox_item_handle_t item = outbox_get(outbox, msg_id);
if (item) {
item->pending = true;
item->pending = pending;
return ESP_OK;
}
return ESP_FAIL;
@@ -150,7 +169,7 @@ int outbox_get_size(outbox_handle_t outbox)
esp_err_t outbox_cleanup(outbox_handle_t outbox, int max_size)
{
while(outbox_get_size(outbox) > max_size) {
outbox_item_handle_t item = outbox_dequeue(outbox);
outbox_item_handle_t item = outbox_dequeue(outbox, CONFIRMED);
if (item == NULL) {
return ESP_FAIL;
}

View File

@@ -13,6 +13,18 @@
/* using uri parser */
#include "http_parser.h"
#ifdef MQTT_DISABLE_API_LOCKS
# define MQTT_API_LOCK(c)
# define MQTT_API_UNLOCK(c)
# define MQTT_API_LOCK_FROM_OTHER_TASK(c)
# define MQTT_API_UNLOCK_FROM_OTHER_TASK(c)
#else
# define MQTT_API_LOCK(c) xSemaphoreTake(c->api_lock, portMAX_DELAY)
# define MQTT_API_UNLOCK(c) xSemaphoreGive(c->api_lock)
# define MQTT_API_LOCK_FROM_OTHER_TASK(c) { if (c->task_handle != xTaskGetCurrentTaskHandle()) { xSemaphoreTake(c->api_lock, portMAX_DELAY); } }
# define MQTT_API_UNLOCK_FROM_OTHER_TASK(c) { if (c->task_handle != xTaskGetCurrentTaskHandle()) { xSemaphoreGive(c->api_lock); } }
#endif /* MQTT_USE_API_LOCKS */
static const char *TAG = "MQTT_CLIENT";
typedef struct mqtt_state
@@ -44,6 +56,7 @@ typedef struct {
bool auto_reconnect;
void *user_context;
int network_timeout_ms;
int refresh_connection_after_ms;
} mqtt_config_storage_t;
typedef enum {
@@ -61,8 +74,9 @@ struct esp_mqtt_client {
mqtt_state_t mqtt_state;
mqtt_connect_info_t connect_info;
mqtt_client_state_t state;
long long keepalive_tick;
long long reconnect_tick;
uint64_t refresh_connection_tick;
uint64_t keepalive_tick;
uint64_t reconnect_tick;
int wait_timeout_ms;
int auto_reconnect;
esp_mqtt_event_t event;
@@ -70,105 +84,145 @@ struct esp_mqtt_client {
bool wait_for_ping_resp;
outbox_handle_t outbox;
EventGroupHandle_t status_bits;
SemaphoreHandle_t api_lock;
TaskHandle_t task_handle;
};
const static int STOPPED_BIT = BIT0;
const static int RECONNECT_BIT = BIT1;
static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client);
static esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_client_config_t *config);
static esp_err_t esp_mqtt_dispatch_event_with_msgid(esp_mqtt_client_handle_t client);
static esp_err_t esp_mqtt_destroy_config(esp_mqtt_client_handle_t client);
static esp_err_t esp_mqtt_connect(esp_mqtt_client_handle_t client, int timeout_ms);
static esp_err_t esp_mqtt_abort_connection(esp_mqtt_client_handle_t client);
static esp_err_t esp_mqtt_client_ping(esp_mqtt_client_handle_t client);
static char *create_string(const char *ptr, int len);
static esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_client_config_t *config)
esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_client_config_t *config)
{
MQTT_API_LOCK(client);
//Copy user configurations to client context
esp_err_t err = ESP_OK;
mqtt_config_storage_t *cfg = calloc(1, sizeof(mqtt_config_storage_t));
ESP_MEM_CHECK(TAG, cfg, return ESP_ERR_NO_MEM);
mqtt_config_storage_t *cfg;
if (client->config) {
cfg = client->config;
} else {
cfg = calloc(1, sizeof(mqtt_config_storage_t));
ESP_MEM_CHECK(TAG, cfg, {
MQTT_API_UNLOCK(client);
return ESP_ERR_NO_MEM;
});
client->config = cfg;
}
if (config->task_prio) {
cfg->task_prio = config->task_prio;
}
client->config = cfg;
cfg->task_prio = config->task_prio;
if (cfg->task_prio <= 0) {
cfg->task_prio = MQTT_TASK_PRIORITY;
}
cfg->task_stack = config->task_stack;
if (config->task_stack) {
cfg->task_stack = config->task_stack;
}
if (cfg->task_stack == 0) {
cfg->task_stack = MQTT_TASK_STACK;
}
if (config->port) {
cfg->port = config->port;
}
err = ESP_ERR_NO_MEM;
if (config->host) {
free(cfg->host);
cfg->host = strdup(config->host);
ESP_MEM_CHECK(TAG, cfg->host, goto _mqtt_set_config_failed);
}
cfg->port = config->port;
if (config->username) {
free(client->connect_info.username);
client->connect_info.username = strdup(config->username);
ESP_MEM_CHECK(TAG, client->connect_info.username, goto _mqtt_set_config_failed);
}
if (config->password) {
free(client->connect_info.password);
client->connect_info.password = strdup(config->password);
ESP_MEM_CHECK(TAG, client->connect_info.password, goto _mqtt_set_config_failed);
}
if (config->client_id) {
free(client->connect_info.client_id);
client->connect_info.client_id = strdup(config->client_id);
} else {
ESP_MEM_CHECK(TAG, client->connect_info.client_id, goto _mqtt_set_config_failed);
} else if (client->connect_info.client_id == NULL) {
client->connect_info.client_id = platform_create_id_string();
}
ESP_MEM_CHECK(TAG, client->connect_info.client_id, goto _mqtt_set_config_failed);
ESP_LOGD(TAG, "MQTT client_id=%s", client->connect_info.client_id);
if (config->uri) {
free(cfg->uri);
cfg->uri = strdup(config->uri);
ESP_MEM_CHECK(TAG, cfg->uri, goto _mqtt_set_config_failed);
}
if (config->lwt_topic) {
free(client->connect_info.will_topic);
client->connect_info.will_topic = strdup(config->lwt_topic);
ESP_MEM_CHECK(TAG, client->connect_info.will_topic, goto _mqtt_set_config_failed);
}
if (config->lwt_msg_len) {
if (config->lwt_msg_len && config->lwt_msg) {
free(client->connect_info.will_message);
client->connect_info.will_message = malloc(config->lwt_msg_len);
ESP_MEM_CHECK(TAG, client->connect_info.will_message, goto _mqtt_set_config_failed);
memcpy(client->connect_info.will_message, config->lwt_msg, config->lwt_msg_len);
client->connect_info.will_length = config->lwt_msg_len;
} else if (config->lwt_msg) {
free(client->connect_info.will_message);
client->connect_info.will_message = strdup(config->lwt_msg);
ESP_MEM_CHECK(TAG, client->connect_info.will_message, goto _mqtt_set_config_failed);
client->connect_info.will_length = strlen(config->lwt_msg);
}
client->connect_info.will_qos = config->lwt_qos;
client->connect_info.will_retain = config->lwt_retain;
client->connect_info.clean_session = 1;
if (config->disable_clean_session) {
client->connect_info.clean_session = false;
if (config->lwt_qos) {
client->connect_info.will_qos = config->lwt_qos;
}
if (config->lwt_retain) {
client->connect_info.will_retain = config->lwt_retain;
}
if (config->disable_clean_session == client->connect_info.clean_session) {
client->connect_info.clean_session = !config->disable_clean_session;
}
if (config->keepalive) {
client->connect_info.keepalive = config->keepalive;
}
client->connect_info.keepalive = config->keepalive;
if (client->connect_info.keepalive == 0) {
client->connect_info.keepalive = MQTT_KEEPALIVE_TICK;
}
cfg->network_timeout_ms = MQTT_NETWORK_TIMEOUT_MS;
cfg->user_context = config->user_context;
cfg->event_handle = config->event_handle;
cfg->auto_reconnect = true;
if (config->disable_auto_reconnect) {
cfg->auto_reconnect = false;
if (config->user_context) {
cfg->user_context = config->user_context;
}
if (config->event_handle) {
cfg->event_handle = config->event_handle;
}
return err;
if (config->refresh_connection_after_ms) {
cfg->refresh_connection_after_ms = config->refresh_connection_after_ms;
}
cfg->auto_reconnect = true;
if (config->disable_auto_reconnect == cfg->auto_reconnect) {
cfg->auto_reconnect = !config->disable_auto_reconnect;
}
MQTT_API_UNLOCK(client);
return ESP_OK;
_mqtt_set_config_failed:
esp_mqtt_destroy_config(client);
MQTT_API_UNLOCK(client);
return err;
}
@@ -179,11 +233,13 @@ static esp_err_t esp_mqtt_destroy_config(esp_mqtt_client_handle_t client)
free(cfg->uri);
free(cfg->path);
free(cfg->scheme);
memset(cfg, 0, sizeof(mqtt_config_storage_t));
free(client->connect_info.will_topic);
free(client->connect_info.will_message);
free(client->connect_info.client_id);
free(client->connect_info.username);
free(client->connect_info.password);
memset(&client->connect_info, 0, sizeof(mqtt_connect_info_t));
free(client->config);
return ESP_OK;
}
@@ -255,10 +311,10 @@ static esp_err_t esp_mqtt_abort_connection(esp_mqtt_client_handle_t client)
client->wait_timeout_ms = MQTT_RECONNECT_TIMEOUT_MS;
client->reconnect_tick = platform_tick_get_ms();
client->state = MQTT_STATE_WAIT_TIMEOUT;
ESP_LOGI(TAG, "Reconnect after %d ms", client->wait_timeout_ms);
ESP_LOGD(TAG, "Reconnect after %d ms", client->wait_timeout_ms);
client->event.event_id = MQTT_EVENT_DISCONNECTED;
client->wait_for_ping_resp = false;
esp_mqtt_dispatch_event(client);
esp_mqtt_dispatch_event_with_msgid(client);
return ESP_OK;
}
@@ -266,9 +322,13 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co
{
esp_mqtt_client_handle_t client = calloc(1, sizeof(struct esp_mqtt_client));
ESP_MEM_CHECK(TAG, client, return NULL);
client->api_lock = xSemaphoreCreateMutex();
if (!client->api_lock) {
free(client);
return NULL;
}
esp_mqtt_set_config(client, config);
MQTT_API_LOCK(client);
client->transport_list = esp_transport_list_init();
ESP_MEM_CHECK(TAG, client->transport_list, goto _mqtt_init_failed);
@@ -328,13 +388,9 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co
}
}
if (client->config->scheme == NULL) {
client->config->scheme = create_string("mqtt", 4);
ESP_MEM_CHECK(TAG, client->config->scheme, goto _mqtt_init_failed);
}
client->keepalive_tick = platform_tick_get_ms();
client->reconnect_tick = platform_tick_get_ms();
client->refresh_connection_tick = platform_tick_get_ms();
client->wait_for_ping_resp = false;
int buffer_size = config->buffer_size;
if (buffer_size <= 0) {
@@ -353,9 +409,11 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co
ESP_MEM_CHECK(TAG, client->outbox, goto _mqtt_init_failed);
client->status_bits = xEventGroupCreate();
ESP_MEM_CHECK(TAG, client->status_bits, goto _mqtt_init_failed);
MQTT_API_UNLOCK(client);
return client;
_mqtt_init_failed:
esp_mqtt_client_destroy(client);
MQTT_API_UNLOCK(client);
return NULL;
}
@@ -368,6 +426,7 @@ esp_err_t esp_mqtt_client_destroy(esp_mqtt_client_handle_t client)
vEventGroupDelete(client->status_bits);
free(client->mqtt_state.in_buffer);
free(client->mqtt_state.out_buffer);
vSemaphoreDelete(client->api_lock);
free(client);
return ESP_OK;
}
@@ -394,19 +453,16 @@ esp_err_t esp_mqtt_client_set_uri(esp_mqtt_client_handle_t client, const char *u
return ESP_FAIL;
}
if (client->config->scheme == NULL) {
client->config->scheme = create_string(uri + puri.field_data[UF_SCHEMA].off, puri.field_data[UF_SCHEMA].len);
}
// set uri overrides actual scheme, host, path if configured previously
free(client->config->scheme);
free(client->config->host);
free(client->config->path);
if (client->config->host == NULL) {
client->config->host = create_string(uri + puri.field_data[UF_HOST].off, puri.field_data[UF_HOST].len);
}
client->config->scheme = create_string(uri + puri.field_data[UF_SCHEMA].off, puri.field_data[UF_SCHEMA].len);
client->config->host = create_string(uri + puri.field_data[UF_HOST].off, puri.field_data[UF_HOST].len);
client->config->path = create_string(uri + puri.field_data[UF_PATH].off, puri.field_data[UF_PATH].len);
if (client->config->path == NULL) {
client->config->path = create_string(uri + puri.field_data[UF_PATH].off, puri.field_data[UF_PATH].len);
}
if (client->config->path) {
#if MQTT_ENABLE_WSS || MQTT_ENABLE_WS
esp_transport_handle_t trans = esp_transport_list_get_transport(client->transport_list, "ws");
if (trans) {
esp_transport_ws_set_path(trans, client->config->path);
@@ -415,7 +471,6 @@ esp_err_t esp_mqtt_client_set_uri(esp_mqtt_client_handle_t client, const char *u
if (trans) {
esp_transport_ws_set_path(trans, client->config->path);
}
#endif
}
if (puri.field_data[UF_PORT].len) {
@@ -456,9 +511,14 @@ static esp_err_t mqtt_write_data(esp_mqtt_client_handle_t client)
return ESP_OK;
}
static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client)
static esp_err_t esp_mqtt_dispatch_event_with_msgid(esp_mqtt_client_handle_t client)
{
client->event.msg_id = mqtt_get_id(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length);
return esp_mqtt_dispatch_event(client);
}
static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client)
{
client->event.user_context = client->config->user_context;
client->event.client = client;
@@ -468,32 +528,37 @@ static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client)
return ESP_FAIL;
}
typedef struct {
char *path;
char *buffer;
esp_transport_handle_t parent;
} transport_ws_t;
static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, int length)
{
const char *mqtt_topic, *mqtt_data;
const char *mqtt_topic = NULL, *mqtt_data = NULL;
uint32_t mqtt_topic_length, mqtt_data_length;
uint32_t mqtt_len, mqtt_offset = 0, total_mqtt_len = 0;
int len_read;
uint32_t mqtt_len = 0, mqtt_offset = 0, total_mqtt_len = 0;
int len_read= length;
int max_to_read = client->mqtt_state.in_buffer_length;
int buffer_offset = 0;
esp_transport_handle_t transport = client->transport;
do
{
if (total_mqtt_len == 0) {
mqtt_topic_length = length;
mqtt_topic = mqtt_get_publish_topic(message, &mqtt_topic_length);
mqtt_data_length = length;
mqtt_data = mqtt_get_publish_data(message, &mqtt_data_length);
total_mqtt_len = client->mqtt_state.message_length - client->mqtt_state.message_length_read + mqtt_data_length;
mqtt_len = mqtt_data_length;
/* any further reading only the underlying payload */
transport = esp_transport_get_payload_transport_handle(transport);
mqtt_data_length = mqtt_topic_length = length;
if (NULL == (mqtt_topic = mqtt_get_publish_topic(message, &mqtt_topic_length)) ||
NULL == (mqtt_data = mqtt_get_publish_data(message, &mqtt_data_length)) ) {
// mqtt header is not complete, continue reading
memmove(client->mqtt_state.in_buffer, message, length);
buffer_offset = length;
message = client->mqtt_state.in_buffer;
max_to_read = client->mqtt_state.in_buffer_length - length;
mqtt_len = 0;
} else {
total_mqtt_len = client->mqtt_state.message_length - client->mqtt_state.message_length_read + mqtt_data_length;
mqtt_len = mqtt_data_length;
mqtt_data_length = client->mqtt_state.message_length - ((uint8_t*)mqtt_data- message);
/* read msg id only once */
client->event.msg_id = mqtt_get_id(message, length);
}
} else {
mqtt_len = len_read;
mqtt_data = (const char*)client->mqtt_state.in_buffer;
@@ -501,15 +566,17 @@ static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, i
mqtt_topic_length = 0;
}
ESP_LOGD(TAG, "Get data len= %d, topic len=%d", mqtt_len, mqtt_topic_length);
client->event.event_id = MQTT_EVENT_DATA;
client->event.data = (char *)mqtt_data;
client->event.data_len = mqtt_len;
client->event.total_data_len = total_mqtt_len;
client->event.current_data_offset = mqtt_offset;
client->event.topic = (char *)mqtt_topic;
client->event.topic_len = mqtt_topic_length;
esp_mqtt_dispatch_event(client);
if (total_mqtt_len != 0) {
ESP_LOGD(TAG, "Get data len= %d, topic len=%d", mqtt_len, mqtt_topic_length);
client->event.event_id = MQTT_EVENT_DATA;
client->event.data = (char *)mqtt_data;
client->event.data_len = mqtt_len;
client->event.total_data_len = mqtt_data_length;
client->event.current_data_offset = mqtt_offset;
client->event.topic = (char *)mqtt_topic;
client->event.topic_len = mqtt_topic_length;
esp_mqtt_dispatch_event(client);
}
mqtt_offset += mqtt_len;
if (client->mqtt_state.message_length_read >= client->mqtt_state.message_length) {
@@ -517,18 +584,20 @@ static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, i
}
len_read = esp_transport_read(transport,
(char *)client->mqtt_state.in_buffer,
client->mqtt_state.message_length - client->mqtt_state.message_length_read > client->mqtt_state.in_buffer_length ?
client->mqtt_state.in_buffer_length : client->mqtt_state.message_length - client->mqtt_state.message_length_read,
(char *)client->mqtt_state.in_buffer + buffer_offset,
client->mqtt_state.message_length - client->mqtt_state.message_length_read > max_to_read ?
max_to_read : client->mqtt_state.message_length - client->mqtt_state.message_length_read,
client->config->network_timeout_ms);
length = len_read + buffer_offset;
buffer_offset = 0;
max_to_read = client->mqtt_state.in_buffer_length;
if (len_read <= 0) {
ESP_LOGE(TAG, "Read error or timeout: %d", errno);
ESP_LOGE(TAG, "Read error or timeout: len_read=%d, errno=%d", len_read, errno);
break;
}
client->mqtt_state.message_length_read += len_read;
} while (1);
}
static bool is_valid_mqtt_msg(esp_mqtt_client_handle_t client, int msg_type, int msg_id)
@@ -549,19 +618,42 @@ static bool is_valid_mqtt_msg(esp_mqtt_client_handle_t client, int msg_type, int
return false;
}
static void mqtt_enqueue_oversized(esp_mqtt_client_handle_t client, uint8_t *remaining_data, int remaining_len)
{
ESP_LOGD(TAG, "mqtt_enqueue_oversized id: %d, type=%d successful",
client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type);
//lock mutex
outbox_message_t msg = { 0 };
if (client->mqtt_state.pending_msg_count > 0) {
client->mqtt_state.pending_msg_count --;
}
msg.data = client->mqtt_state.outbound_message->data;
msg.len = client->mqtt_state.outbound_message->length;
msg.msg_id = client->mqtt_state.pending_msg_id;
msg.msg_type = client->mqtt_state.pending_msg_type;
msg.msg_qos = client->mqtt_state.pending_publish_qos;
msg.remaining_data = remaining_data;
msg.remaining_len = remaining_len;
//Copy to queue buffer
outbox_enqueue(client->outbox, &msg, platform_tick_get_ms());
//unlock
}
static void mqtt_enqueue(esp_mqtt_client_handle_t client)
{
ESP_LOGD(TAG, "mqtt_enqueue id: %d, type=%d successful",
client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type);
//lock mutex
if (client->mqtt_state.pending_msg_count > 0) {
outbox_message_t msg = { 0 };
msg.data = client->mqtt_state.outbound_message->data;
msg.len = client->mqtt_state.outbound_message->length;
msg.msg_id = client->mqtt_state.pending_msg_id;
msg.msg_type = client->mqtt_state.pending_msg_type;
msg.msg_qos = client->mqtt_state.pending_publish_qos;
//Copy to queue buffer
outbox_enqueue(client->outbox,
client->mqtt_state.outbound_message->data,
client->mqtt_state.outbound_message->length,
client->mqtt_state.pending_msg_id,
client->mqtt_state.pending_msg_type,
platform_tick_get_ms());
outbox_enqueue(client->outbox, &msg, platform_tick_get_ms());
}
//unlock
}
@@ -572,8 +664,9 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
uint8_t msg_type;
uint8_t msg_qos;
uint16_t msg_id;
uint32_t transport_message_offset = 0 ;
read_len = esp_transport_read(client->transport, (char *)client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length, 1000);
read_len = esp_transport_read(client->transport, (char *)client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length, 0);
if (read_len < 0) {
ESP_LOGE(TAG, "Read error or end of stream");
@@ -584,89 +677,116 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client)
return ESP_OK;
}
msg_type = mqtt_get_type(client->mqtt_state.in_buffer);
msg_qos = mqtt_get_qos(client->mqtt_state.in_buffer);
msg_id = mqtt_get_id(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length);
// In case of fragmented packet (when receiving a large publish message), the deliver_publish function will read the rest of the message with more transport read, which means the MQTT message length will be greater than the initial transport read length. That explains that the stopping condition is not the equality here
while ( transport_message_offset < read_len ) {
// If the message was valid, get the type, quality of service and id of the message
msg_type = mqtt_get_type(&client->mqtt_state.in_buffer[transport_message_offset]);
msg_qos = mqtt_get_qos(&client->mqtt_state.in_buffer[transport_message_offset]);
msg_id = mqtt_get_id(&client->mqtt_state.in_buffer[transport_message_offset], read_len - transport_message_offset);
client->mqtt_state.message_length_read = read_len - transport_message_offset;
client->mqtt_state.message_length = mqtt_get_total_length(&client->mqtt_state.in_buffer[transport_message_offset], client->mqtt_state.message_length_read);
ESP_LOGD(TAG, "msg_type=%d, msg_id=%d", msg_type, msg_id);
switch (msg_type)
{
case MQTT_MSG_TYPE_SUBACK:
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_SUBSCRIBE, msg_id)) {
ESP_LOGD(TAG, "Subscribe successful");
client->event.event_id = MQTT_EVENT_SUBSCRIBED;
esp_mqtt_dispatch_event(client);
}
break;
case MQTT_MSG_TYPE_UNSUBACK:
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_UNSUBSCRIBE, msg_id)) {
ESP_LOGD(TAG, "UnSubscribe successful");
client->event.event_id = MQTT_EVENT_UNSUBSCRIBED;
esp_mqtt_dispatch_event(client);
}
break;
case MQTT_MSG_TYPE_PUBLISH:
if (msg_qos == 1) {
client->mqtt_state.outbound_message = mqtt_msg_puback(&client->mqtt_state.mqtt_connection, msg_id);
}
else if (msg_qos == 2) {
client->mqtt_state.outbound_message = mqtt_msg_pubrec(&client->mqtt_state.mqtt_connection, msg_id);
}
ESP_LOGD(TAG, "msg_type=%d, msg_id=%d", msg_type, msg_id);
if (msg_qos == 1 || msg_qos == 2) {
ESP_LOGD(TAG, "Queue response QoS: %d", msg_qos);
if (mqtt_write_data(client) != ESP_OK) {
ESP_LOGE(TAG, "Error write qos msg repsonse, qos = %d", msg_qos);
// TODO: Shoule reconnect?
// return ESP_FAIL;
switch (msg_type)
{
case MQTT_MSG_TYPE_SUBACK:
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_SUBSCRIBE, msg_id)) {
ESP_LOGD(TAG, "Subscribe successful");
client->event.event_id = MQTT_EVENT_SUBSCRIBED;
esp_mqtt_dispatch_event_with_msgid(client);
}
break;
case MQTT_MSG_TYPE_UNSUBACK:
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_UNSUBSCRIBE, msg_id)) {
ESP_LOGD(TAG, "UnSubscribe successful");
client->event.event_id = MQTT_EVENT_UNSUBSCRIBED;
esp_mqtt_dispatch_event_with_msgid(client);
}
break;
case MQTT_MSG_TYPE_PUBLISH:
if (msg_qos == 1) {
client->mqtt_state.outbound_message = mqtt_msg_puback(&client->mqtt_state.mqtt_connection, msg_id);
}
else if (msg_qos == 2) {
client->mqtt_state.outbound_message = mqtt_msg_pubrec(&client->mqtt_state.mqtt_connection, msg_id);
}
}
client->mqtt_state.message_length_read = read_len;
client->mqtt_state.message_length = mqtt_get_total_length(client->mqtt_state.in_buffer, client->mqtt_state.message_length_read);
ESP_LOGI(TAG, "deliver_publish, message_length_read=%d, message_length=%d", read_len, client->mqtt_state.message_length);
deliver_publish(client, client->mqtt_state.in_buffer, client->mqtt_state.message_length_read);
break;
case MQTT_MSG_TYPE_PUBACK:
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) {
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish");
client->event.event_id = MQTT_EVENT_PUBLISHED;
esp_mqtt_dispatch_event(client);
}
if (msg_qos == 1 || msg_qos == 2) {
ESP_LOGD(TAG, "Queue response QoS: %d", msg_qos);
break;
case MQTT_MSG_TYPE_PUBREC:
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREC");
client->mqtt_state.outbound_message = mqtt_msg_pubrel(&client->mqtt_state.mqtt_connection, msg_id);
mqtt_write_data(client);
break;
case MQTT_MSG_TYPE_PUBREL:
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREL");
client->mqtt_state.outbound_message = mqtt_msg_pubcomp(&client->mqtt_state.mqtt_connection, msg_id);
mqtt_write_data(client);
if (mqtt_write_data(client) != ESP_OK) {
ESP_LOGE(TAG, "Error write qos msg repsonse, qos = %d", msg_qos);
// TODO: Shoule reconnect?
// return ESP_FAIL;
}
}
// Deliver the publish message
ESP_LOGD(TAG, "deliver_publish, message_length_read=%d, message_length=%d", client->mqtt_state.message_length_read, client->mqtt_state.message_length);
deliver_publish(client, &client->mqtt_state.in_buffer[transport_message_offset], client->mqtt_state.message_length_read);
break;
case MQTT_MSG_TYPE_PUBACK:
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) {
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish");
outbox_set_pending(client->outbox, msg_id, CONFIRMED);
client->event.event_id = MQTT_EVENT_PUBLISHED;
esp_mqtt_dispatch_event_with_msgid(client);
}
break;
case MQTT_MSG_TYPE_PUBREC:
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREC");
client->mqtt_state.outbound_message = mqtt_msg_pubrel(&client->mqtt_state.mqtt_connection, msg_id);
outbox_set_pending(client->outbox, msg_id, CONFIRMED);
mqtt_write_data(client);
break;
case MQTT_MSG_TYPE_PUBREL:
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREL");
client->mqtt_state.outbound_message = mqtt_msg_pubcomp(&client->mqtt_state.mqtt_connection, msg_id);
mqtt_write_data(client);
break;
case MQTT_MSG_TYPE_PUBCOMP:
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBCOMP");
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) {
ESP_LOGD(TAG, "Receive MQTT_MSG_TYPE_PUBCOMP, finish QoS2 publish");
client->event.event_id = MQTT_EVENT_PUBLISHED;
esp_mqtt_dispatch_event_with_msgid(client);
}
break;
case MQTT_MSG_TYPE_PINGRESP:
ESP_LOGD(TAG, "MQTT_MSG_TYPE_PINGRESP");
client->wait_for_ping_resp = false;
break;
}
break;
case MQTT_MSG_TYPE_PUBCOMP:
ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBCOMP");
if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) {
ESP_LOGD(TAG, "Receive MQTT_MSG_TYPE_PUBCOMP, finish QoS2 publish");
client->event.event_id = MQTT_EVENT_PUBLISHED;
esp_mqtt_dispatch_event(client);
}
break;
case MQTT_MSG_TYPE_PINGRESP:
ESP_LOGD(TAG, "MQTT_MSG_TYPE_PINGRESP");
client->wait_for_ping_resp = false;
break;
transport_message_offset += client->mqtt_state.message_length;
}
return ESP_OK;
}
static esp_err_t mqtt_resend_queued(esp_mqtt_client_handle_t client, outbox_item_handle_t item)
{
// decode queued data
client->mqtt_state.outbound_message->data = outbox_item_get_data(item, &client->mqtt_state.outbound_message->length, &client->mqtt_state.pending_msg_id,
&client->mqtt_state.pending_msg_type, &client->mqtt_state.pending_publish_qos);
// set duplicate flag for QoS-2 message
if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH &&client->mqtt_state.pending_publish_qos==2) {
mqtt_set_dup(client->mqtt_state.outbound_message->data);
}
// try to resend the data
if (mqtt_write_data(client) != ESP_OK) {
ESP_LOGE(TAG, "Error to public data ");
esp_mqtt_abort_connection(client);
return ESP_FAIL;
}
return ESP_OK;
}
static void esp_mqtt_task(void *pv)
{
esp_mqtt_client_handle_t client = (esp_mqtt_client_handle_t) pv;
uint32_t last_retransmit = 0;
client->run = true;
//get transport by scheme
@@ -684,9 +804,13 @@ static void esp_mqtt_task(void *pv)
client->state = MQTT_STATE_INIT;
xEventGroupClearBits(client->status_bits, STOPPED_BIT);
while (client->run) {
MQTT_API_LOCK(client);
switch ((int)client->state) {
case MQTT_STATE_INIT:
xEventGroupClearBits(client->status_bits, RECONNECT_BIT);
client->event.event_id = MQTT_EVENT_BEFORE_CONNECT;
esp_mqtt_dispatch_event_with_msgid(client);
if (client->transport == NULL) {
ESP_LOGE(TAG, "There are no transport");
client->run = false;
@@ -709,7 +833,8 @@ static void esp_mqtt_task(void *pv)
client->event.event_id = MQTT_EVENT_CONNECTED;
client->event.session_present = mqtt_get_connect_session_present(client->mqtt_state.in_buffer);
client->state = MQTT_STATE_CONNECTED;
esp_mqtt_dispatch_event(client);
esp_mqtt_dispatch_event_with_msgid(client);
client->refresh_connection_tick = platform_tick_get_ms();
break;
case MQTT_STATE_CONNECTED:
@@ -719,6 +844,21 @@ static void esp_mqtt_task(void *pv)
break;
}
// resend all non-transmitted messages first
outbox_item_handle_t item = outbox_dequeue(client->outbox, QUEUED);
if (item) {
if (mqtt_resend_queued(client, item) == ESP_OK) {
outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED);
}
// resend other "transmitted" messages after 1s
} else if (platform_tick_get_ms() - last_retransmit > 1000) {
last_retransmit = platform_tick_get_ms();
item = outbox_dequeue(client->outbox, TRANSMITTED);
if (item) {
mqtt_resend_queued(client, item);
}
}
if (platform_tick_get_ms() - client->keepalive_tick > client->connect_info.keepalive * 1000 / 2) {
//No ping resp from last ping => Disconnected
if(client->wait_for_ping_resp){
@@ -737,6 +877,13 @@ static void esp_mqtt_task(void *pv)
ESP_LOGD(TAG, "PING sent");
}
if (client->config->refresh_connection_after_ms &&
platform_tick_get_ms() - client->refresh_connection_tick > client->config->refresh_connection_after_ms) {
ESP_LOGD(TAG, "Refreshing the connection...");
esp_mqtt_abort_connection(client);
client->state = MQTT_STATE_INIT;
}
//Delete mesaage after 30 senconds
outbox_delete_expired(client->outbox, platform_tick_get_ms(), OUTBOX_EXPIRED_TIMEOUT_MS);
//
@@ -752,10 +899,22 @@ static void esp_mqtt_task(void *pv)
client->state = MQTT_STATE_INIT;
client->reconnect_tick = platform_tick_get_ms();
ESP_LOGD(TAG, "Reconnecting...");
break;
}
vTaskDelay(client->wait_timeout_ms / 2 / portTICK_RATE_MS);
break;
MQTT_API_UNLOCK(client);
xEventGroupWaitBits(client->status_bits, RECONNECT_BIT, false, true,
client->wait_timeout_ms / 2 / portTICK_RATE_MS);
// continue the while loop insted of break, as the mutex is unlocked
continue;
}
MQTT_API_UNLOCK(client);
if (MQTT_STATE_CONNECTED == client->state) {
if (esp_transport_poll_read(client->transport, MQTT_POLL_READ_TIMEOUT_MS) < 0) {
ESP_LOGE(TAG, "Poll read error: %d, aborting connection", errno);
esp_mqtt_abort_connection(client);
}
}
}
esp_transport_close(client->transport);
xEventGroupSetBits(client->status_bits, STOPPED_BIT);
@@ -771,13 +930,13 @@ esp_err_t esp_mqtt_client_start(esp_mqtt_client_handle_t client)
}
#if MQTT_CORE_SELECTION_ENABLED
ESP_LOGD(TAG, "Core selection enabled on %u", MQTT_TASK_CORE);
if (xTaskCreatePinnedToCore(esp_mqtt_task, "mqtt_task", client->config->task_stack, client, client->config->task_prio, NULL, MQTT_TASK_CORE) != pdTRUE) {
if (xTaskCreatePinnedToCore(esp_mqtt_task, "mqtt_task", client->config->task_stack, client, client->config->task_prio, &client->task_handle, MQTT_TASK_CORE) != pdTRUE) {
ESP_LOGE(TAG, "Error create mqtt task");
return ESP_FAIL;
}
#else
ESP_LOGD(TAG, "Core selection disabled");
if (xTaskCreate(esp_mqtt_task, "mqtt_task", client->config->task_stack, client, client->config->task_prio, NULL) != pdTRUE) {
if (xTaskCreate(esp_mqtt_task, "mqtt_task", client->config->task_stack, client, client->config->task_prio, &client->task_handle) != pdTRUE) {
ESP_LOGE(TAG, "Error create mqtt task");
return ESP_FAIL;
}
@@ -785,6 +944,18 @@ esp_err_t esp_mqtt_client_start(esp_mqtt_client_handle_t client)
return ESP_OK;
}
esp_err_t esp_mqtt_client_reconnect(esp_mqtt_client_handle_t client)
{
ESP_LOGI(TAG, "Client force reconnect requested");
if (client->state != MQTT_STATE_WAIT_TIMEOUT) {
ESP_LOGD(TAG, "The client is not waiting for reconnection. Ignore the request");
return ESP_FAIL;
}
client->wait_timeout_ms = 0;
xEventGroupSetBits(client->status_bits, RECONNECT_BIT);
return ESP_OK;
}
esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client)
{
if (client->run) {
@@ -816,20 +987,23 @@ int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, const char *topic
ESP_LOGE(TAG, "Client has not connected");
return -1;
}
mqtt_enqueue(client); //move pending msg to outbox (if have)
MQTT_API_LOCK_FROM_OTHER_TASK(client);
client->mqtt_state.outbound_message = mqtt_msg_subscribe(&client->mqtt_state.mqtt_connection,
topic, qos,
&client->mqtt_state.pending_msg_id);
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
client->mqtt_state.pending_msg_count ++;
mqtt_enqueue(client); //move pending msg to outbox (if have)
if (mqtt_write_data(client) != ESP_OK) {
ESP_LOGE(TAG, "Error to subscribe topic=%s, qos=%d", topic, qos);
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
return -1;
}
ESP_LOGD(TAG, "Sent subscribe topic=%s, id: %d, type=%d successful", topic, client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type);
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
return client->mqtt_state.pending_msg_id;
}
@@ -839,7 +1013,7 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top
ESP_LOGE(TAG, "Client has not connected");
return -1;
}
mqtt_enqueue(client);
MQTT_API_LOCK_FROM_OTHER_TASK(client);
client->mqtt_state.outbound_message = mqtt_msg_unsubscribe(&client->mqtt_state.mqtt_connection,
topic,
&client->mqtt_state.pending_msg_id);
@@ -847,27 +1021,28 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
client->mqtt_state.pending_msg_count ++;
mqtt_enqueue(client);
if (mqtt_write_data(client) != ESP_OK) {
ESP_LOGE(TAG, "Error to unsubscribe topic=%s", topic);
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
return -1;
}
ESP_LOGD(TAG, "Sent Unsubscribe topic=%s, id: %d, successful", topic, client->mqtt_state.pending_msg_id);
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
return client->mqtt_state.pending_msg_id;
}
int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain)
{
uint16_t pending_msg_id = 0;
if (client->state != MQTT_STATE_CONNECTED) {
ESP_LOGE(TAG, "Client has not connected");
return -1;
}
if (len <= 0) {
len = strlen(data);
}
MQTT_API_LOCK_FROM_OTHER_TASK(client);
mqtt_message_t *publish_msg = mqtt_msg_publish(&client->mqtt_state.mqtt_connection,
topic, data, len,
qos, retain,
@@ -875,20 +1050,82 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic,
/* We have to set as pending all the qos>0 messages) */
if (qos > 0) {
mqtt_enqueue(client);
client->mqtt_state.outbound_message = publish_msg;
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
client->mqtt_state.pending_msg_id = pending_msg_id;
client->mqtt_state.pending_publish_qos = qos;
client->mqtt_state.pending_msg_count ++;
// by default store as QUEUED (not transmitted yet)
mqtt_enqueue(client);
} else {
client->mqtt_state.outbound_message = publish_msg;
}
if (mqtt_write_data(client) != ESP_OK) {
ESP_LOGE(TAG, "Error to public data to topic=%s, qos=%d", topic, qos);
return -1;
/* Skip sending if not connected (rely on resending) */
if (client->state != MQTT_STATE_CONNECTED) {
ESP_LOGD(TAG, "Publish: client is not connected");
goto cannot_publish;
}
/* Provide support for sending fragmented message if it doesn't fit buffer */
int remaining_len = len;
const char *current_data = data;
bool sending = true;
while (sending) {
if (mqtt_write_data(client) != ESP_OK) {
esp_mqtt_abort_connection(client);
goto cannot_publish;
}
int data_sent = client->mqtt_state.outbound_message->length - client->mqtt_state.outbound_message->fragmented_msg_data_offset;
remaining_len -= data_sent;
current_data += data_sent;
if (remaining_len > 0) {
mqtt_connection_t* connection = &client->mqtt_state.mqtt_connection;
ESP_LOGD(TAG, "Sending fragmented message, remains to send %d bytes of %d", remaining_len, len);
if (connection->message.fragmented_msg_data_offset) {
// asked to enqueue oversized message (first time only)
connection->message.fragmented_msg_data_offset = 0;
connection->message.fragmented_msg_total_length = 0;
if (qos > 0) {
// internally enqueue all big messages, as they dont fit 'pending msg' structure
mqtt_enqueue_oversized(client, (uint8_t*)current_data, remaining_len);
}
}
if (remaining_len > connection->buffer_length) {
// Continue with sending
memcpy(connection->buffer, current_data, connection->buffer_length);
connection->message.length = connection->buffer_length;
sending = true;
} else {
memcpy(connection->buffer, current_data, remaining_len);
connection->message.length = remaining_len;
sending = true;
}
connection->message.data = connection->buffer;
client->mqtt_state.outbound_message = &connection->message;
} else {
// Message was sent correctly
sending = false;
}
}
if (qos > 0) {
outbox_set_pending(client->outbox, pending_msg_id, TRANSMITTED);
}
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
return pending_msg_id;
cannot_publish:
if (qos == 0) {
ESP_LOGW(TAG, "Publish: Loosing qos0 data when client not connected");
}
MQTT_API_UNLOCK_FROM_OTHER_TASK(client);
return 0;
}