From 2882a6fac2f1288aaa44234c0ccbf900df4f5ecc Mon Sep 17 00:00:00 2001 From: yuanjm Date: Sun, 26 Apr 2020 18:56:27 +0800 Subject: [PATCH] feat(mqtt): update mqtt component to esp-mqtt commit id 752953dc and update some example --- components/mqtt/esp-mqtt/.travis.yml | 2 + .../mqtt/esp-mqtt/include/mqtt_client.h | 4 + .../mqtt/esp-mqtt/include/mqtt_config.h | 17 +- .../mqtt/esp-mqtt/lib/include/mqtt_msg.h | 4 +- .../mqtt/esp-mqtt/lib/include/mqtt_outbox.h | 24 +- components/mqtt/esp-mqtt/lib/mqtt_msg.c | 68 +- components/mqtt/esp-mqtt/lib/mqtt_outbox.c | 47 +- components/mqtt/esp-mqtt/mqtt_client.c | 579 ++++++++++++------ .../protocols/esp-mqtt/ssl/CMakeLists.txt | 2 +- examples/protocols/esp-mqtt/ssl/README.md | 6 +- .../esp-mqtt/ssl/main/Kconfig.projbuild | 47 +- .../protocols/esp-mqtt/ssl/main/app_main.c | 14 +- .../protocols/esp-mqtt/ssl/main/component.mk | 2 +- ...t_eclipse_org.pem => mqtt_eclipse_org.pem} | 0 .../esp-mqtt/ssl/mqtt_ssl_example_test.py | 5 +- .../ssl_mutual_auth/main/Kconfig.projbuild | 20 +- .../esp-mqtt/ssl_mutual_auth/main/app_main.c | 3 + .../esp-mqtt/tcp/main/Kconfig.projbuild | 36 +- .../protocols/esp-mqtt/tcp/main/app_main.c | 3 + .../esp-mqtt/tcp/mqtt_tcp_example_test.py | 60 +- examples/protocols/esp-mqtt/ws/README.md | 2 +- .../esp-mqtt/ws/main/Kconfig.projbuild | 30 +- .../protocols/esp-mqtt/ws/main/app_main.c | 3 + .../esp-mqtt/ws/mqtt_ws_example_test.py | 2 +- .../protocols/esp-mqtt/wss/CMakeLists.txt | 2 +- examples/protocols/esp-mqtt/wss/README.md | 6 +- .../esp-mqtt/wss/main/Kconfig.projbuild | 47 +- .../protocols/esp-mqtt/wss/main/app_main.c | 11 +- .../protocols/esp-mqtt/wss/main/component.mk | 2 +- ...t_eclipse_org.pem => mqtt_eclipse_org.pem} | 0 .../esp-mqtt/wss/mqtt_wss_example_test.py | 5 +- 31 files changed, 696 insertions(+), 357 deletions(-) rename examples/protocols/esp-mqtt/ssl/main/{iot_eclipse_org.pem => mqtt_eclipse_org.pem} (100%) rename examples/protocols/esp-mqtt/wss/main/{iot_eclipse_org.pem => mqtt_eclipse_org.pem} (100%) diff --git a/components/mqtt/esp-mqtt/.travis.yml b/components/mqtt/esp-mqtt/.travis.yml index 0b8a4599..521d2930 100644 --- a/components/mqtt/esp-mqtt/.travis.yml +++ b/components/mqtt/esp-mqtt/.travis.yml @@ -13,6 +13,8 @@ addons: before_install: # Save path to the git respository - PROJECT_PATH=$(pwd) + # Have to checkout a temp branch for later in tree reference + - git checkout -b temporary_ref_branch - CI_COMMIT_SHA=$(git rev-parse HEAD) install: diff --git a/components/mqtt/esp-mqtt/include/mqtt_client.h b/components/mqtt/esp-mqtt/include/mqtt_client.h index 630ba172..e4e11d42 100755 --- a/components/mqtt/esp-mqtt/include/mqtt_client.h +++ b/components/mqtt/esp-mqtt/include/mqtt_client.h @@ -45,6 +45,7 @@ typedef enum { - current_data_offset offset of the current data for this event - total_data_len total length of the data received */ + MQTT_EVENT_BEFORE_CONNECT, /*!< The event occurs before connecting */ } esp_mqtt_event_id_t; typedef enum { @@ -103,16 +104,19 @@ typedef struct { const char *client_cert_pem; /*!< Pointer to certificate data in PEM format for SSL mutual authentication, default is NULL, not required if mutual authentication is not needed. If it is not NULL, also `client_key_pem` has to be provided. */ const char *client_key_pem; /*!< Pointer to private key data in PEM format for SSL mutual authentication, default is NULL, not required if mutual authentication is not needed. If it is not NULL, also `client_cert_pem` has to be provided. */ esp_mqtt_transport_t transport; /*!< overrides URI transport */ + int refresh_connection_after_ms; /*!< Refresh connection after this value (in milliseconds) */ } esp_mqtt_client_config_t; esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *config); esp_err_t esp_mqtt_client_set_uri(esp_mqtt_client_handle_t client, const char *uri); esp_err_t esp_mqtt_client_start(esp_mqtt_client_handle_t client); +esp_err_t esp_mqtt_client_reconnect(esp_mqtt_client_handle_t client); esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client); esp_err_t esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, const char *topic, int qos); esp_err_t esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *topic); int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain); esp_err_t esp_mqtt_client_destroy(esp_mqtt_client_handle_t client); +esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_client_config_t *config); #ifdef __cplusplus } diff --git a/components/mqtt/esp-mqtt/include/mqtt_config.h b/components/mqtt/esp-mqtt/include/mqtt_config.h index c0b4ab75..972f2a1c 100644 --- a/components/mqtt/esp-mqtt/include/mqtt_config.h +++ b/components/mqtt/esp-mqtt/include/mqtt_config.h @@ -10,6 +10,7 @@ #define MQTT_PROTOCOL_311 CONFIG_MQTT_PROTOCOL_311 #define MQTT_RECONNECT_TIMEOUT_MS (10*1000) +#define MQTT_POLL_READ_TIMEOUT_MS (1000) #if CONFIG_MQTT_BUFFER_SIZE #define MQTT_BUFFER_SIZE_BYTE CONFIG_MQTT_BUFFER_SIZE @@ -61,14 +62,18 @@ #define MQTT_CORE_SELECTION_ENABLED CONFIG_MQTT_TASK_CORE_SELECTION_ENABLED +#ifdef CONFIG_MQTT_DISABLE_API_LOCKS +#define MQTT_DISABLE_API_LOCKS CONFIG_MQTT_DISABLE_API_LOCKS +#endif + #ifdef CONFIG_MQTT_USE_CORE_0 - #define MQTT_TASK_CORE 0 + #define MQTT_TASK_CORE 0 #else - #ifdef CONFIG_MQTT_USE_CORE_1 - #define MQTT_TASK_CORE 1 - #else - #define MQTT_TASK_CORE 0 - #endif + #ifdef CONFIG_MQTT_USE_CORE_1 + #define MQTT_TASK_CORE 1 + #else + #define MQTT_TASK_CORE 0 + #endif #endif diff --git a/components/mqtt/esp-mqtt/lib/include/mqtt_msg.h b/components/mqtt/esp-mqtt/lib/include/mqtt_msg.h index 049c738c..1a07cae5 100644 --- a/components/mqtt/esp-mqtt/lib/include/mqtt_msg.h +++ b/components/mqtt/esp-mqtt/lib/include/mqtt_msg.h @@ -72,7 +72,8 @@ typedef struct mqtt_message { uint8_t* data; uint32_t length; - + uint32_t fragmented_msg_total_length; /*!< total len of fragmented messages (zero for all other messages) */ + uint32_t fragmented_msg_data_offset; /*!< data offset of fragmented messages (zero for all other messages) */ } mqtt_message_t; typedef struct mqtt_connection @@ -105,6 +106,7 @@ static inline int mqtt_get_type(uint8_t* buffer) { return (buffer[0] & 0xf0) >> static inline int mqtt_get_connect_session_present(uint8_t* buffer) { return buffer[2] & 0x01; } static inline int mqtt_get_connect_return_code(uint8_t* buffer) { return buffer[3]; } static inline int mqtt_get_dup(uint8_t* buffer) { return (buffer[0] & 0x08) >> 3; } +static inline void mqtt_set_dup(uint8_t* buffer) { buffer[0] |= 0x08; } static inline int mqtt_get_qos(uint8_t* buffer) { return (buffer[0] & 0x06) >> 1; } static inline int mqtt_get_retain(uint8_t* buffer) { return (buffer[0] & 0x01); } diff --git a/components/mqtt/esp-mqtt/lib/include/mqtt_outbox.h b/components/mqtt/esp-mqtt/lib/include/mqtt_outbox.h index a87b4662..b4f1e4f9 100644 --- a/components/mqtt/esp-mqtt/lib/include/mqtt_outbox.h +++ b/components/mqtt/esp-mqtt/lib/include/mqtt_outbox.h @@ -15,17 +15,35 @@ struct outbox_item; typedef struct outbox_list_t * outbox_handle_t; typedef struct outbox_item * outbox_item_handle_t; +typedef struct outbox_message * outbox_message_handle_t; + +typedef struct outbox_message { + uint8_t *data; + int len; + int msg_id; + int msg_qos; + int msg_type; + uint8_t *remaining_data; + int remaining_len; +} outbox_message_t; + +typedef enum pending_state { + QUEUED, + TRANSMITTED, + CONFIRMED +} pending_state_t; outbox_handle_t outbox_init(); -outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, uint8_t *data, int len, int msg_id, int msg_type, int tick); -outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox); +outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handle_t message, int tick); +outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pending); outbox_item_handle_t outbox_get(outbox_handle_t outbox, int msg_id); +uint8_t* outbox_item_get_data(outbox_item_handle_t item, size_t *len, uint16_t *msg_id, int *msg_type, int *qos); esp_err_t outbox_delete(outbox_handle_t outbox, int msg_id, int msg_type); esp_err_t outbox_delete_msgid(outbox_handle_t outbox, int msg_id); esp_err_t outbox_delete_msgtype(outbox_handle_t outbox, int msg_type); esp_err_t outbox_delete_expired(outbox_handle_t outbox, int current_tick, int timeout); -esp_err_t outbox_set_pending(outbox_handle_t outbox, int msg_id); +esp_err_t outbox_set_pending(outbox_handle_t outbox, int msg_id, pending_state_t pending); int outbox_get_size(outbox_handle_t outbox); esp_err_t outbox_cleanup(outbox_handle_t outbox, int max_size); void outbox_destroy(outbox_handle_t outbox); diff --git a/components/mqtt/esp-mqtt/lib/mqtt_msg.c b/components/mqtt/esp-mqtt/lib/mqtt_msg.c index eb978472..15eb93e6 100644 --- a/components/mqtt/esp-mqtt/lib/mqtt_msg.c +++ b/components/mqtt/esp-mqtt/lib/mqtt_msg.c @@ -34,7 +34,7 @@ #include "mqtt_config.h" #include "platform.h" -#define MQTT_MAX_FIXED_HEADER_SIZE 3 +#define MQTT_MAX_FIXED_HEADER_SIZE 5 enum mqtt_connect_flag { @@ -105,22 +105,42 @@ static mqtt_message_t* fail_message(mqtt_connection_t* connection) static mqtt_message_t* fini_message(mqtt_connection_t* connection, int type, int dup, int qos, int retain) { - int remaining_length = connection->message.length - MQTT_MAX_FIXED_HEADER_SIZE; - - if (remaining_length > 127) - { - connection->buffer[0] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1); - connection->buffer[1] = 0x80 | (remaining_length % 128); - connection->buffer[2] = remaining_length / 128; - connection->message.length = remaining_length + 3; - connection->message.data = connection->buffer; + int message_length = connection->message.length - MQTT_MAX_FIXED_HEADER_SIZE; + int total_length = message_length; + int encoded_length = 0; + uint8_t encoded_lens[4] = {0}; + // Check if we have fragmented message and update total_len + if (connection->message.fragmented_msg_total_length) { + total_length = connection->message.fragmented_msg_total_length - MQTT_MAX_FIXED_HEADER_SIZE; } - else - { - connection->buffer[1] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1); - connection->buffer[2] = remaining_length; - connection->message.length = remaining_length + 2; - connection->message.data = connection->buffer + 1; + + // Encode MQTT message length + int len_bytes = 0; // size of encoded message length + do { + encoded_length = total_length % 128; + total_length /= 128; + if (total_length > 0) { + encoded_length |= 0x80; + } + encoded_lens[len_bytes] = encoded_length; + len_bytes++; + } while (total_length > 0); + + // Sanity check for MQTT header + if (len_bytes + 1 > MQTT_MAX_FIXED_HEADER_SIZE) { + return fail_message(connection); + } + + // Save the header bytes + connection->message.length = message_length + len_bytes + 1; // msg len + encoded_size len + type (1 byte) + int offs = MQTT_MAX_FIXED_HEADER_SIZE - 1 - len_bytes; + connection->message.data = connection->buffer + offs; + connection->message.fragmented_msg_data_offset -= offs; + // type byte + connection->buffer[offs++] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1); + // length bytes + for (int j = 0; jbuffer[offs++] = encoded_lens[j]; } return &connection->message; @@ -377,11 +397,17 @@ mqtt_message_t* mqtt_msg_publish(mqtt_connection_t* connection, const char* topi else *message_id = 0; - if (connection->message.length + data_length > connection->buffer_length) - return fail_message(connection); - memcpy(connection->buffer + connection->message.length, data, data_length); - connection->message.length += data_length; - + if (connection->message.length + data_length > connection->buffer_length) { + // Not enough size in buffer -> fragment this message + connection->message.fragmented_msg_data_offset = connection->message.length; + memcpy(connection->buffer + connection->message.length, data, connection->buffer_length - connection->message.length); + connection->message.length = connection->buffer_length; + connection->message.fragmented_msg_total_length = data_length + connection->message.fragmented_msg_data_offset; + } else { + memcpy(connection->buffer + connection->message.length, data, data_length); + connection->message.length += data_length; + connection->message.fragmented_msg_total_length = 0; + } return fini_message(connection, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain); } diff --git a/components/mqtt/esp-mqtt/lib/mqtt_outbox.c b/components/mqtt/esp-mqtt/lib/mqtt_outbox.c index 300e6987..403fd9c0 100644 --- a/components/mqtt/esp-mqtt/lib/mqtt_outbox.c +++ b/components/mqtt/esp-mqtt/lib/mqtt_outbox.c @@ -14,9 +14,10 @@ typedef struct outbox_item { int len; int msg_id; int msg_type; + int msg_qos; int tick; int retry_count; - bool pending; + pending_state_t pending; STAILQ_ENTRY(outbox_item) next; } outbox_item_t; @@ -31,22 +32,27 @@ outbox_handle_t outbox_init() return outbox; } -outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, uint8_t *data, int len, int msg_id, int msg_type, int tick) +outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handle_t message, int tick) { outbox_item_handle_t item = calloc(1, sizeof(outbox_item_t)); ESP_MEM_CHECK(TAG, item, return NULL); - item->msg_id = msg_id; - item->msg_type = msg_type; + item->msg_id = message->msg_id; + item->msg_type = message->msg_type; + item->msg_qos = message->msg_qos; item->tick = tick; - item->len = len; - item->buffer = malloc(len); + item->len = message->len; + item->pending = QUEUED; + item->buffer = malloc(message->len + message->remaining_len); ESP_MEM_CHECK(TAG, item->buffer, { free(item); return NULL; }); - memcpy(item->buffer, data, len); + memcpy(item->buffer, message->data, message->len); + if (message->remaining_data) { + memcpy(item->buffer+message->len, message->remaining_data, message->remaining_len); + } STAILQ_INSERT_TAIL(outbox, item, next); - ESP_LOGD(TAG, "ENQUEUE msgid=%d, msg_type=%d, len=%d, size=%d", msg_id, msg_type, len, outbox_get_size(outbox)); + ESP_LOGD(TAG, "ENQUEUE msgid=%d, msg_type=%d, len=%d, size=%d", message->msg_id, message->msg_type, message->len + message->remaining_len, outbox_get_size(outbox)); return item; } @@ -61,21 +67,34 @@ outbox_item_handle_t outbox_get(outbox_handle_t outbox, int msg_id) return NULL; } -outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox) +outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pending) { outbox_item_handle_t item; STAILQ_FOREACH(item, outbox, next) { - if (!item->pending) { + if (item->pending == pending) { return item; } } return NULL; } + +uint8_t* outbox_item_get_data(outbox_item_handle_t item, size_t *len, uint16_t *msg_id, int *msg_type, int *qos) +{ + if (item) { + *len = item->len; + *msg_id = item->msg_id; + *msg_type = item->msg_type; + *qos = item->msg_qos; + return (uint8_t*)item->buffer; + } + return NULL; +} + esp_err_t outbox_delete(outbox_handle_t outbox, int msg_id, int msg_type) { outbox_item_handle_t item, tmp; STAILQ_FOREACH_SAFE(item, outbox, next, tmp) { - if (item->msg_id == msg_id && item->msg_type == msg_type) { + if (item->msg_id == msg_id && (0xFF&(item->msg_type)) == msg_type) { STAILQ_REMOVE(outbox, item, outbox_item, next); free(item->buffer); free(item); @@ -99,11 +118,11 @@ esp_err_t outbox_delete_msgid(outbox_handle_t outbox, int msg_id) } return ESP_OK; } -esp_err_t outbox_set_pending(outbox_handle_t outbox, int msg_id) +esp_err_t outbox_set_pending(outbox_handle_t outbox, int msg_id, pending_state_t pending) { outbox_item_handle_t item = outbox_get(outbox, msg_id); if (item) { - item->pending = true; + item->pending = pending; return ESP_OK; } return ESP_FAIL; @@ -150,7 +169,7 @@ int outbox_get_size(outbox_handle_t outbox) esp_err_t outbox_cleanup(outbox_handle_t outbox, int max_size) { while(outbox_get_size(outbox) > max_size) { - outbox_item_handle_t item = outbox_dequeue(outbox); + outbox_item_handle_t item = outbox_dequeue(outbox, CONFIRMED); if (item == NULL) { return ESP_FAIL; } diff --git a/components/mqtt/esp-mqtt/mqtt_client.c b/components/mqtt/esp-mqtt/mqtt_client.c index 9768f72d..8fe31cc9 100644 --- a/components/mqtt/esp-mqtt/mqtt_client.c +++ b/components/mqtt/esp-mqtt/mqtt_client.c @@ -13,6 +13,18 @@ /* using uri parser */ #include "http_parser.h" +#ifdef MQTT_DISABLE_API_LOCKS +# define MQTT_API_LOCK(c) +# define MQTT_API_UNLOCK(c) +# define MQTT_API_LOCK_FROM_OTHER_TASK(c) +# define MQTT_API_UNLOCK_FROM_OTHER_TASK(c) +#else +# define MQTT_API_LOCK(c) xSemaphoreTake(c->api_lock, portMAX_DELAY) +# define MQTT_API_UNLOCK(c) xSemaphoreGive(c->api_lock) +# define MQTT_API_LOCK_FROM_OTHER_TASK(c) { if (c->task_handle != xTaskGetCurrentTaskHandle()) { xSemaphoreTake(c->api_lock, portMAX_DELAY); } } +# define MQTT_API_UNLOCK_FROM_OTHER_TASK(c) { if (c->task_handle != xTaskGetCurrentTaskHandle()) { xSemaphoreGive(c->api_lock); } } +#endif /* MQTT_USE_API_LOCKS */ + static const char *TAG = "MQTT_CLIENT"; typedef struct mqtt_state @@ -44,6 +56,7 @@ typedef struct { bool auto_reconnect; void *user_context; int network_timeout_ms; + int refresh_connection_after_ms; } mqtt_config_storage_t; typedef enum { @@ -61,8 +74,9 @@ struct esp_mqtt_client { mqtt_state_t mqtt_state; mqtt_connect_info_t connect_info; mqtt_client_state_t state; - long long keepalive_tick; - long long reconnect_tick; + uint64_t refresh_connection_tick; + uint64_t keepalive_tick; + uint64_t reconnect_tick; int wait_timeout_ms; int auto_reconnect; esp_mqtt_event_t event; @@ -70,105 +84,145 @@ struct esp_mqtt_client { bool wait_for_ping_resp; outbox_handle_t outbox; EventGroupHandle_t status_bits; + SemaphoreHandle_t api_lock; + TaskHandle_t task_handle; }; const static int STOPPED_BIT = BIT0; +const static int RECONNECT_BIT = BIT1; static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client); -static esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_client_config_t *config); +static esp_err_t esp_mqtt_dispatch_event_with_msgid(esp_mqtt_client_handle_t client); static esp_err_t esp_mqtt_destroy_config(esp_mqtt_client_handle_t client); static esp_err_t esp_mqtt_connect(esp_mqtt_client_handle_t client, int timeout_ms); static esp_err_t esp_mqtt_abort_connection(esp_mqtt_client_handle_t client); static esp_err_t esp_mqtt_client_ping(esp_mqtt_client_handle_t client); static char *create_string(const char *ptr, int len); -static esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_client_config_t *config) +esp_err_t esp_mqtt_set_config(esp_mqtt_client_handle_t client, const esp_mqtt_client_config_t *config) { + MQTT_API_LOCK(client); //Copy user configurations to client context esp_err_t err = ESP_OK; - mqtt_config_storage_t *cfg = calloc(1, sizeof(mqtt_config_storage_t)); - ESP_MEM_CHECK(TAG, cfg, return ESP_ERR_NO_MEM); + mqtt_config_storage_t *cfg; + if (client->config) { + cfg = client->config; + } else { + cfg = calloc(1, sizeof(mqtt_config_storage_t)); + ESP_MEM_CHECK(TAG, cfg, { + MQTT_API_UNLOCK(client); + return ESP_ERR_NO_MEM; + }); + client->config = cfg; + } + if (config->task_prio) { + cfg->task_prio = config->task_prio; + } - client->config = cfg; - - cfg->task_prio = config->task_prio; if (cfg->task_prio <= 0) { cfg->task_prio = MQTT_TASK_PRIORITY; } - - cfg->task_stack = config->task_stack; + if (config->task_stack) { + cfg->task_stack = config->task_stack; + } if (cfg->task_stack == 0) { cfg->task_stack = MQTT_TASK_STACK; } + if (config->port) { + cfg->port = config->port; + } + err = ESP_ERR_NO_MEM; if (config->host) { + free(cfg->host); cfg->host = strdup(config->host); ESP_MEM_CHECK(TAG, cfg->host, goto _mqtt_set_config_failed); } - cfg->port = config->port; if (config->username) { + free(client->connect_info.username); client->connect_info.username = strdup(config->username); ESP_MEM_CHECK(TAG, client->connect_info.username, goto _mqtt_set_config_failed); } if (config->password) { + free(client->connect_info.password); client->connect_info.password = strdup(config->password); ESP_MEM_CHECK(TAG, client->connect_info.password, goto _mqtt_set_config_failed); } if (config->client_id) { + free(client->connect_info.client_id); client->connect_info.client_id = strdup(config->client_id); - } else { + ESP_MEM_CHECK(TAG, client->connect_info.client_id, goto _mqtt_set_config_failed); + } else if (client->connect_info.client_id == NULL) { client->connect_info.client_id = platform_create_id_string(); } ESP_MEM_CHECK(TAG, client->connect_info.client_id, goto _mqtt_set_config_failed); ESP_LOGD(TAG, "MQTT client_id=%s", client->connect_info.client_id); if (config->uri) { + free(cfg->uri); cfg->uri = strdup(config->uri); ESP_MEM_CHECK(TAG, cfg->uri, goto _mqtt_set_config_failed); } if (config->lwt_topic) { + free(client->connect_info.will_topic); client->connect_info.will_topic = strdup(config->lwt_topic); ESP_MEM_CHECK(TAG, client->connect_info.will_topic, goto _mqtt_set_config_failed); } - if (config->lwt_msg_len) { + if (config->lwt_msg_len && config->lwt_msg) { + free(client->connect_info.will_message); client->connect_info.will_message = malloc(config->lwt_msg_len); ESP_MEM_CHECK(TAG, client->connect_info.will_message, goto _mqtt_set_config_failed); memcpy(client->connect_info.will_message, config->lwt_msg, config->lwt_msg_len); client->connect_info.will_length = config->lwt_msg_len; } else if (config->lwt_msg) { + free(client->connect_info.will_message); client->connect_info.will_message = strdup(config->lwt_msg); ESP_MEM_CHECK(TAG, client->connect_info.will_message, goto _mqtt_set_config_failed); client->connect_info.will_length = strlen(config->lwt_msg); } - - client->connect_info.will_qos = config->lwt_qos; - client->connect_info.will_retain = config->lwt_retain; - - client->connect_info.clean_session = 1; - if (config->disable_clean_session) { - client->connect_info.clean_session = false; + if (config->lwt_qos) { + client->connect_info.will_qos = config->lwt_qos; + } + if (config->lwt_retain) { + client->connect_info.will_retain = config->lwt_retain; + } + + if (config->disable_clean_session == client->connect_info.clean_session) { + client->connect_info.clean_session = !config->disable_clean_session; + } + if (config->keepalive) { + client->connect_info.keepalive = config->keepalive; } - client->connect_info.keepalive = config->keepalive; if (client->connect_info.keepalive == 0) { client->connect_info.keepalive = MQTT_KEEPALIVE_TICK; } cfg->network_timeout_ms = MQTT_NETWORK_TIMEOUT_MS; - cfg->user_context = config->user_context; - cfg->event_handle = config->event_handle; - cfg->auto_reconnect = true; - if (config->disable_auto_reconnect) { - cfg->auto_reconnect = false; + if (config->user_context) { + cfg->user_context = config->user_context; } + if (config->event_handle) { + cfg->event_handle = config->event_handle; + } - return err; + if (config->refresh_connection_after_ms) { + cfg->refresh_connection_after_ms = config->refresh_connection_after_ms; + } + + cfg->auto_reconnect = true; + if (config->disable_auto_reconnect == cfg->auto_reconnect) { + cfg->auto_reconnect = !config->disable_auto_reconnect; + } + MQTT_API_UNLOCK(client); + return ESP_OK; _mqtt_set_config_failed: esp_mqtt_destroy_config(client); + MQTT_API_UNLOCK(client); return err; } @@ -179,11 +233,13 @@ static esp_err_t esp_mqtt_destroy_config(esp_mqtt_client_handle_t client) free(cfg->uri); free(cfg->path); free(cfg->scheme); + memset(cfg, 0, sizeof(mqtt_config_storage_t)); free(client->connect_info.will_topic); free(client->connect_info.will_message); free(client->connect_info.client_id); free(client->connect_info.username); free(client->connect_info.password); + memset(&client->connect_info, 0, sizeof(mqtt_connect_info_t)); free(client->config); return ESP_OK; } @@ -255,10 +311,10 @@ static esp_err_t esp_mqtt_abort_connection(esp_mqtt_client_handle_t client) client->wait_timeout_ms = MQTT_RECONNECT_TIMEOUT_MS; client->reconnect_tick = platform_tick_get_ms(); client->state = MQTT_STATE_WAIT_TIMEOUT; - ESP_LOGI(TAG, "Reconnect after %d ms", client->wait_timeout_ms); + ESP_LOGD(TAG, "Reconnect after %d ms", client->wait_timeout_ms); client->event.event_id = MQTT_EVENT_DISCONNECTED; client->wait_for_ping_resp = false; - esp_mqtt_dispatch_event(client); + esp_mqtt_dispatch_event_with_msgid(client); return ESP_OK; } @@ -266,9 +322,13 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co { esp_mqtt_client_handle_t client = calloc(1, sizeof(struct esp_mqtt_client)); ESP_MEM_CHECK(TAG, client, return NULL); - + client->api_lock = xSemaphoreCreateMutex(); + if (!client->api_lock) { + free(client); + return NULL; + } esp_mqtt_set_config(client, config); - + MQTT_API_LOCK(client); client->transport_list = esp_transport_list_init(); ESP_MEM_CHECK(TAG, client->transport_list, goto _mqtt_init_failed); @@ -328,13 +388,9 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co } } - if (client->config->scheme == NULL) { - client->config->scheme = create_string("mqtt", 4); - ESP_MEM_CHECK(TAG, client->config->scheme, goto _mqtt_init_failed); - } - client->keepalive_tick = platform_tick_get_ms(); client->reconnect_tick = platform_tick_get_ms(); + client->refresh_connection_tick = platform_tick_get_ms(); client->wait_for_ping_resp = false; int buffer_size = config->buffer_size; if (buffer_size <= 0) { @@ -353,9 +409,11 @@ esp_mqtt_client_handle_t esp_mqtt_client_init(const esp_mqtt_client_config_t *co ESP_MEM_CHECK(TAG, client->outbox, goto _mqtt_init_failed); client->status_bits = xEventGroupCreate(); ESP_MEM_CHECK(TAG, client->status_bits, goto _mqtt_init_failed); + MQTT_API_UNLOCK(client); return client; _mqtt_init_failed: esp_mqtt_client_destroy(client); + MQTT_API_UNLOCK(client); return NULL; } @@ -368,6 +426,7 @@ esp_err_t esp_mqtt_client_destroy(esp_mqtt_client_handle_t client) vEventGroupDelete(client->status_bits); free(client->mqtt_state.in_buffer); free(client->mqtt_state.out_buffer); + vSemaphoreDelete(client->api_lock); free(client); return ESP_OK; } @@ -394,19 +453,16 @@ esp_err_t esp_mqtt_client_set_uri(esp_mqtt_client_handle_t client, const char *u return ESP_FAIL; } - if (client->config->scheme == NULL) { - client->config->scheme = create_string(uri + puri.field_data[UF_SCHEMA].off, puri.field_data[UF_SCHEMA].len); - } + // set uri overrides actual scheme, host, path if configured previously + free(client->config->scheme); + free(client->config->host); + free(client->config->path); - if (client->config->host == NULL) { - client->config->host = create_string(uri + puri.field_data[UF_HOST].off, puri.field_data[UF_HOST].len); - } + client->config->scheme = create_string(uri + puri.field_data[UF_SCHEMA].off, puri.field_data[UF_SCHEMA].len); + client->config->host = create_string(uri + puri.field_data[UF_HOST].off, puri.field_data[UF_HOST].len); + client->config->path = create_string(uri + puri.field_data[UF_PATH].off, puri.field_data[UF_PATH].len); - if (client->config->path == NULL) { - client->config->path = create_string(uri + puri.field_data[UF_PATH].off, puri.field_data[UF_PATH].len); - } if (client->config->path) { -#if MQTT_ENABLE_WSS || MQTT_ENABLE_WS esp_transport_handle_t trans = esp_transport_list_get_transport(client->transport_list, "ws"); if (trans) { esp_transport_ws_set_path(trans, client->config->path); @@ -415,7 +471,6 @@ esp_err_t esp_mqtt_client_set_uri(esp_mqtt_client_handle_t client, const char *u if (trans) { esp_transport_ws_set_path(trans, client->config->path); } -#endif } if (puri.field_data[UF_PORT].len) { @@ -456,9 +511,14 @@ static esp_err_t mqtt_write_data(esp_mqtt_client_handle_t client) return ESP_OK; } -static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client) +static esp_err_t esp_mqtt_dispatch_event_with_msgid(esp_mqtt_client_handle_t client) { client->event.msg_id = mqtt_get_id(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length); + return esp_mqtt_dispatch_event(client); +} + +static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client) +{ client->event.user_context = client->config->user_context; client->event.client = client; @@ -468,32 +528,37 @@ static esp_err_t esp_mqtt_dispatch_event(esp_mqtt_client_handle_t client) return ESP_FAIL; } - -typedef struct { - char *path; - char *buffer; - esp_transport_handle_t parent; -} transport_ws_t; - static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, int length) { - const char *mqtt_topic, *mqtt_data; + const char *mqtt_topic = NULL, *mqtt_data = NULL; uint32_t mqtt_topic_length, mqtt_data_length; - uint32_t mqtt_len, mqtt_offset = 0, total_mqtt_len = 0; - int len_read; + uint32_t mqtt_len = 0, mqtt_offset = 0, total_mqtt_len = 0; + int len_read= length; + int max_to_read = client->mqtt_state.in_buffer_length; + int buffer_offset = 0; esp_transport_handle_t transport = client->transport; do { if (total_mqtt_len == 0) { - mqtt_topic_length = length; - mqtt_topic = mqtt_get_publish_topic(message, &mqtt_topic_length); - mqtt_data_length = length; - mqtt_data = mqtt_get_publish_data(message, &mqtt_data_length); - total_mqtt_len = client->mqtt_state.message_length - client->mqtt_state.message_length_read + mqtt_data_length; - mqtt_len = mqtt_data_length; /* any further reading only the underlying payload */ transport = esp_transport_get_payload_transport_handle(transport); + mqtt_data_length = mqtt_topic_length = length; + if (NULL == (mqtt_topic = mqtt_get_publish_topic(message, &mqtt_topic_length)) || + NULL == (mqtt_data = mqtt_get_publish_data(message, &mqtt_data_length)) ) { + // mqtt header is not complete, continue reading + memmove(client->mqtt_state.in_buffer, message, length); + buffer_offset = length; + message = client->mqtt_state.in_buffer; + max_to_read = client->mqtt_state.in_buffer_length - length; + mqtt_len = 0; + } else { + total_mqtt_len = client->mqtt_state.message_length - client->mqtt_state.message_length_read + mqtt_data_length; + mqtt_len = mqtt_data_length; + mqtt_data_length = client->mqtt_state.message_length - ((uint8_t*)mqtt_data- message); + /* read msg id only once */ + client->event.msg_id = mqtt_get_id(message, length); + } } else { mqtt_len = len_read; mqtt_data = (const char*)client->mqtt_state.in_buffer; @@ -501,15 +566,17 @@ static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, i mqtt_topic_length = 0; } - ESP_LOGD(TAG, "Get data len= %d, topic len=%d", mqtt_len, mqtt_topic_length); - client->event.event_id = MQTT_EVENT_DATA; - client->event.data = (char *)mqtt_data; - client->event.data_len = mqtt_len; - client->event.total_data_len = total_mqtt_len; - client->event.current_data_offset = mqtt_offset; - client->event.topic = (char *)mqtt_topic; - client->event.topic_len = mqtt_topic_length; - esp_mqtt_dispatch_event(client); + if (total_mqtt_len != 0) { + ESP_LOGD(TAG, "Get data len= %d, topic len=%d", mqtt_len, mqtt_topic_length); + client->event.event_id = MQTT_EVENT_DATA; + client->event.data = (char *)mqtt_data; + client->event.data_len = mqtt_len; + client->event.total_data_len = mqtt_data_length; + client->event.current_data_offset = mqtt_offset; + client->event.topic = (char *)mqtt_topic; + client->event.topic_len = mqtt_topic_length; + esp_mqtt_dispatch_event(client); + } mqtt_offset += mqtt_len; if (client->mqtt_state.message_length_read >= client->mqtt_state.message_length) { @@ -517,18 +584,20 @@ static void deliver_publish(esp_mqtt_client_handle_t client, uint8_t *message, i } len_read = esp_transport_read(transport, - (char *)client->mqtt_state.in_buffer, - client->mqtt_state.message_length - client->mqtt_state.message_length_read > client->mqtt_state.in_buffer_length ? - client->mqtt_state.in_buffer_length : client->mqtt_state.message_length - client->mqtt_state.message_length_read, + (char *)client->mqtt_state.in_buffer + buffer_offset, + client->mqtt_state.message_length - client->mqtt_state.message_length_read > max_to_read ? + max_to_read : client->mqtt_state.message_length - client->mqtt_state.message_length_read, client->config->network_timeout_ms); + length = len_read + buffer_offset; + buffer_offset = 0; + max_to_read = client->mqtt_state.in_buffer_length; if (len_read <= 0) { - ESP_LOGE(TAG, "Read error or timeout: %d", errno); + ESP_LOGE(TAG, "Read error or timeout: len_read=%d, errno=%d", len_read, errno); break; } client->mqtt_state.message_length_read += len_read; } while (1); - } static bool is_valid_mqtt_msg(esp_mqtt_client_handle_t client, int msg_type, int msg_id) @@ -549,19 +618,42 @@ static bool is_valid_mqtt_msg(esp_mqtt_client_handle_t client, int msg_type, int return false; } +static void mqtt_enqueue_oversized(esp_mqtt_client_handle_t client, uint8_t *remaining_data, int remaining_len) +{ + ESP_LOGD(TAG, "mqtt_enqueue_oversized id: %d, type=%d successful", + client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type); + //lock mutex + outbox_message_t msg = { 0 }; + if (client->mqtt_state.pending_msg_count > 0) { + client->mqtt_state.pending_msg_count --; + } + msg.data = client->mqtt_state.outbound_message->data; + msg.len = client->mqtt_state.outbound_message->length; + msg.msg_id = client->mqtt_state.pending_msg_id; + msg.msg_type = client->mqtt_state.pending_msg_type; + msg.msg_qos = client->mqtt_state.pending_publish_qos; + msg.remaining_data = remaining_data; + msg.remaining_len = remaining_len; + //Copy to queue buffer + outbox_enqueue(client->outbox, &msg, platform_tick_get_ms()); + + //unlock +} + static void mqtt_enqueue(esp_mqtt_client_handle_t client) { ESP_LOGD(TAG, "mqtt_enqueue id: %d, type=%d successful", client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type); //lock mutex if (client->mqtt_state.pending_msg_count > 0) { + outbox_message_t msg = { 0 }; + msg.data = client->mqtt_state.outbound_message->data; + msg.len = client->mqtt_state.outbound_message->length; + msg.msg_id = client->mqtt_state.pending_msg_id; + msg.msg_type = client->mqtt_state.pending_msg_type; + msg.msg_qos = client->mqtt_state.pending_publish_qos; //Copy to queue buffer - outbox_enqueue(client->outbox, - client->mqtt_state.outbound_message->data, - client->mqtt_state.outbound_message->length, - client->mqtt_state.pending_msg_id, - client->mqtt_state.pending_msg_type, - platform_tick_get_ms()); + outbox_enqueue(client->outbox, &msg, platform_tick_get_ms()); } //unlock } @@ -572,8 +664,9 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) uint8_t msg_type; uint8_t msg_qos; uint16_t msg_id; + uint32_t transport_message_offset = 0 ; - read_len = esp_transport_read(client->transport, (char *)client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length, 1000); + read_len = esp_transport_read(client->transport, (char *)client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length, 0); if (read_len < 0) { ESP_LOGE(TAG, "Read error or end of stream"); @@ -584,89 +677,116 @@ static esp_err_t mqtt_process_receive(esp_mqtt_client_handle_t client) return ESP_OK; } - msg_type = mqtt_get_type(client->mqtt_state.in_buffer); - msg_qos = mqtt_get_qos(client->mqtt_state.in_buffer); - msg_id = mqtt_get_id(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length); + // In case of fragmented packet (when receiving a large publish message), the deliver_publish function will read the rest of the message with more transport read, which means the MQTT message length will be greater than the initial transport read length. That explains that the stopping condition is not the equality here + while ( transport_message_offset < read_len ) { + // If the message was valid, get the type, quality of service and id of the message + msg_type = mqtt_get_type(&client->mqtt_state.in_buffer[transport_message_offset]); + msg_qos = mqtt_get_qos(&client->mqtt_state.in_buffer[transport_message_offset]); + msg_id = mqtt_get_id(&client->mqtt_state.in_buffer[transport_message_offset], read_len - transport_message_offset); + client->mqtt_state.message_length_read = read_len - transport_message_offset; + client->mqtt_state.message_length = mqtt_get_total_length(&client->mqtt_state.in_buffer[transport_message_offset], client->mqtt_state.message_length_read); - ESP_LOGD(TAG, "msg_type=%d, msg_id=%d", msg_type, msg_id); - switch (msg_type) - { - case MQTT_MSG_TYPE_SUBACK: - if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_SUBSCRIBE, msg_id)) { - ESP_LOGD(TAG, "Subscribe successful"); - client->event.event_id = MQTT_EVENT_SUBSCRIBED; - esp_mqtt_dispatch_event(client); - } - break; - case MQTT_MSG_TYPE_UNSUBACK: - if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_UNSUBSCRIBE, msg_id)) { - ESP_LOGD(TAG, "UnSubscribe successful"); - client->event.event_id = MQTT_EVENT_UNSUBSCRIBED; - esp_mqtt_dispatch_event(client); - } - break; - case MQTT_MSG_TYPE_PUBLISH: - if (msg_qos == 1) { - client->mqtt_state.outbound_message = mqtt_msg_puback(&client->mqtt_state.mqtt_connection, msg_id); - } - else if (msg_qos == 2) { - client->mqtt_state.outbound_message = mqtt_msg_pubrec(&client->mqtt_state.mqtt_connection, msg_id); - } + ESP_LOGD(TAG, "msg_type=%d, msg_id=%d", msg_type, msg_id); - if (msg_qos == 1 || msg_qos == 2) { - ESP_LOGD(TAG, "Queue response QoS: %d", msg_qos); - - if (mqtt_write_data(client) != ESP_OK) { - ESP_LOGE(TAG, "Error write qos msg repsonse, qos = %d", msg_qos); - // TODO: Shoule reconnect? - // return ESP_FAIL; + switch (msg_type) + { + case MQTT_MSG_TYPE_SUBACK: + if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_SUBSCRIBE, msg_id)) { + ESP_LOGD(TAG, "Subscribe successful"); + client->event.event_id = MQTT_EVENT_SUBSCRIBED; + esp_mqtt_dispatch_event_with_msgid(client); + } + break; + case MQTT_MSG_TYPE_UNSUBACK: + if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_UNSUBSCRIBE, msg_id)) { + ESP_LOGD(TAG, "UnSubscribe successful"); + client->event.event_id = MQTT_EVENT_UNSUBSCRIBED; + esp_mqtt_dispatch_event_with_msgid(client); + } + break; + case MQTT_MSG_TYPE_PUBLISH: + if (msg_qos == 1) { + client->mqtt_state.outbound_message = mqtt_msg_puback(&client->mqtt_state.mqtt_connection, msg_id); + } + else if (msg_qos == 2) { + client->mqtt_state.outbound_message = mqtt_msg_pubrec(&client->mqtt_state.mqtt_connection, msg_id); } - } - client->mqtt_state.message_length_read = read_len; - client->mqtt_state.message_length = mqtt_get_total_length(client->mqtt_state.in_buffer, client->mqtt_state.message_length_read); - ESP_LOGI(TAG, "deliver_publish, message_length_read=%d, message_length=%d", read_len, client->mqtt_state.message_length); - deliver_publish(client, client->mqtt_state.in_buffer, client->mqtt_state.message_length_read); - break; - case MQTT_MSG_TYPE_PUBACK: - if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) { - ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish"); - client->event.event_id = MQTT_EVENT_PUBLISHED; - esp_mqtt_dispatch_event(client); - } + if (msg_qos == 1 || msg_qos == 2) { + ESP_LOGD(TAG, "Queue response QoS: %d", msg_qos); - break; - case MQTT_MSG_TYPE_PUBREC: - ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREC"); - client->mqtt_state.outbound_message = mqtt_msg_pubrel(&client->mqtt_state.mqtt_connection, msg_id); - mqtt_write_data(client); - break; - case MQTT_MSG_TYPE_PUBREL: - ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREL"); - client->mqtt_state.outbound_message = mqtt_msg_pubcomp(&client->mqtt_state.mqtt_connection, msg_id); - mqtt_write_data(client); + if (mqtt_write_data(client) != ESP_OK) { + ESP_LOGE(TAG, "Error write qos msg repsonse, qos = %d", msg_qos); + // TODO: Shoule reconnect? + // return ESP_FAIL; + } + } + // Deliver the publish message + ESP_LOGD(TAG, "deliver_publish, message_length_read=%d, message_length=%d", client->mqtt_state.message_length_read, client->mqtt_state.message_length); + deliver_publish(client, &client->mqtt_state.in_buffer[transport_message_offset], client->mqtt_state.message_length_read); + break; + case MQTT_MSG_TYPE_PUBACK: + if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) { + ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish"); + outbox_set_pending(client->outbox, msg_id, CONFIRMED); + client->event.event_id = MQTT_EVENT_PUBLISHED; + esp_mqtt_dispatch_event_with_msgid(client); + } + break; + case MQTT_MSG_TYPE_PUBREC: + ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREC"); + client->mqtt_state.outbound_message = mqtt_msg_pubrel(&client->mqtt_state.mqtt_connection, msg_id); + outbox_set_pending(client->outbox, msg_id, CONFIRMED); + mqtt_write_data(client); + break; + case MQTT_MSG_TYPE_PUBREL: + ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBREL"); + client->mqtt_state.outbound_message = mqtt_msg_pubcomp(&client->mqtt_state.mqtt_connection, msg_id); + mqtt_write_data(client); + break; + case MQTT_MSG_TYPE_PUBCOMP: + ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBCOMP"); + if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) { + ESP_LOGD(TAG, "Receive MQTT_MSG_TYPE_PUBCOMP, finish QoS2 publish"); + client->event.event_id = MQTT_EVENT_PUBLISHED; + esp_mqtt_dispatch_event_with_msgid(client); + } + break; + case MQTT_MSG_TYPE_PINGRESP: + ESP_LOGD(TAG, "MQTT_MSG_TYPE_PINGRESP"); + client->wait_for_ping_resp = false; + break; + } - break; - case MQTT_MSG_TYPE_PUBCOMP: - ESP_LOGD(TAG, "received MQTT_MSG_TYPE_PUBCOMP"); - if (is_valid_mqtt_msg(client, MQTT_MSG_TYPE_PUBLISH, msg_id)) { - ESP_LOGD(TAG, "Receive MQTT_MSG_TYPE_PUBCOMP, finish QoS2 publish"); - client->event.event_id = MQTT_EVENT_PUBLISHED; - esp_mqtt_dispatch_event(client); - } - break; - case MQTT_MSG_TYPE_PINGRESP: - ESP_LOGD(TAG, "MQTT_MSG_TYPE_PINGRESP"); - client->wait_for_ping_resp = false; - break; + transport_message_offset += client->mqtt_state.message_length; } return ESP_OK; } +static esp_err_t mqtt_resend_queued(esp_mqtt_client_handle_t client, outbox_item_handle_t item) +{ + // decode queued data + client->mqtt_state.outbound_message->data = outbox_item_get_data(item, &client->mqtt_state.outbound_message->length, &client->mqtt_state.pending_msg_id, + &client->mqtt_state.pending_msg_type, &client->mqtt_state.pending_publish_qos); + // set duplicate flag for QoS-2 message + if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH &&client->mqtt_state.pending_publish_qos==2) { + mqtt_set_dup(client->mqtt_state.outbound_message->data); + } + + // try to resend the data + if (mqtt_write_data(client) != ESP_OK) { + ESP_LOGE(TAG, "Error to public data "); + esp_mqtt_abort_connection(client); + return ESP_FAIL; + } + return ESP_OK; +} + static void esp_mqtt_task(void *pv) { esp_mqtt_client_handle_t client = (esp_mqtt_client_handle_t) pv; + uint32_t last_retransmit = 0; client->run = true; //get transport by scheme @@ -684,9 +804,13 @@ static void esp_mqtt_task(void *pv) client->state = MQTT_STATE_INIT; xEventGroupClearBits(client->status_bits, STOPPED_BIT); while (client->run) { - + MQTT_API_LOCK(client); switch ((int)client->state) { case MQTT_STATE_INIT: + xEventGroupClearBits(client->status_bits, RECONNECT_BIT); + client->event.event_id = MQTT_EVENT_BEFORE_CONNECT; + esp_mqtt_dispatch_event_with_msgid(client); + if (client->transport == NULL) { ESP_LOGE(TAG, "There are no transport"); client->run = false; @@ -709,7 +833,8 @@ static void esp_mqtt_task(void *pv) client->event.event_id = MQTT_EVENT_CONNECTED; client->event.session_present = mqtt_get_connect_session_present(client->mqtt_state.in_buffer); client->state = MQTT_STATE_CONNECTED; - esp_mqtt_dispatch_event(client); + esp_mqtt_dispatch_event_with_msgid(client); + client->refresh_connection_tick = platform_tick_get_ms(); break; case MQTT_STATE_CONNECTED: @@ -719,6 +844,21 @@ static void esp_mqtt_task(void *pv) break; } + // resend all non-transmitted messages first + outbox_item_handle_t item = outbox_dequeue(client->outbox, QUEUED); + if (item) { + if (mqtt_resend_queued(client, item) == ESP_OK) { + outbox_set_pending(client->outbox, client->mqtt_state.pending_msg_id, TRANSMITTED); + } + // resend other "transmitted" messages after 1s + } else if (platform_tick_get_ms() - last_retransmit > 1000) { + last_retransmit = platform_tick_get_ms(); + item = outbox_dequeue(client->outbox, TRANSMITTED); + if (item) { + mqtt_resend_queued(client, item); + } + } + if (platform_tick_get_ms() - client->keepalive_tick > client->connect_info.keepalive * 1000 / 2) { //No ping resp from last ping => Disconnected if(client->wait_for_ping_resp){ @@ -737,6 +877,13 @@ static void esp_mqtt_task(void *pv) ESP_LOGD(TAG, "PING sent"); } + if (client->config->refresh_connection_after_ms && + platform_tick_get_ms() - client->refresh_connection_tick > client->config->refresh_connection_after_ms) { + ESP_LOGD(TAG, "Refreshing the connection..."); + esp_mqtt_abort_connection(client); + client->state = MQTT_STATE_INIT; + } + //Delete mesaage after 30 senconds outbox_delete_expired(client->outbox, platform_tick_get_ms(), OUTBOX_EXPIRED_TIMEOUT_MS); // @@ -752,10 +899,22 @@ static void esp_mqtt_task(void *pv) client->state = MQTT_STATE_INIT; client->reconnect_tick = platform_tick_get_ms(); ESP_LOGD(TAG, "Reconnecting..."); + break; } - vTaskDelay(client->wait_timeout_ms / 2 / portTICK_RATE_MS); - break; + MQTT_API_UNLOCK(client); + xEventGroupWaitBits(client->status_bits, RECONNECT_BIT, false, true, + client->wait_timeout_ms / 2 / portTICK_RATE_MS); + // continue the while loop insted of break, as the mutex is unlocked + continue; } + MQTT_API_UNLOCK(client); + if (MQTT_STATE_CONNECTED == client->state) { + if (esp_transport_poll_read(client->transport, MQTT_POLL_READ_TIMEOUT_MS) < 0) { + ESP_LOGE(TAG, "Poll read error: %d, aborting connection", errno); + esp_mqtt_abort_connection(client); + } + } + } esp_transport_close(client->transport); xEventGroupSetBits(client->status_bits, STOPPED_BIT); @@ -771,13 +930,13 @@ esp_err_t esp_mqtt_client_start(esp_mqtt_client_handle_t client) } #if MQTT_CORE_SELECTION_ENABLED ESP_LOGD(TAG, "Core selection enabled on %u", MQTT_TASK_CORE); - if (xTaskCreatePinnedToCore(esp_mqtt_task, "mqtt_task", client->config->task_stack, client, client->config->task_prio, NULL, MQTT_TASK_CORE) != pdTRUE) { + if (xTaskCreatePinnedToCore(esp_mqtt_task, "mqtt_task", client->config->task_stack, client, client->config->task_prio, &client->task_handle, MQTT_TASK_CORE) != pdTRUE) { ESP_LOGE(TAG, "Error create mqtt task"); return ESP_FAIL; } #else ESP_LOGD(TAG, "Core selection disabled"); - if (xTaskCreate(esp_mqtt_task, "mqtt_task", client->config->task_stack, client, client->config->task_prio, NULL) != pdTRUE) { + if (xTaskCreate(esp_mqtt_task, "mqtt_task", client->config->task_stack, client, client->config->task_prio, &client->task_handle) != pdTRUE) { ESP_LOGE(TAG, "Error create mqtt task"); return ESP_FAIL; } @@ -785,6 +944,18 @@ esp_err_t esp_mqtt_client_start(esp_mqtt_client_handle_t client) return ESP_OK; } +esp_err_t esp_mqtt_client_reconnect(esp_mqtt_client_handle_t client) +{ + ESP_LOGI(TAG, "Client force reconnect requested"); + if (client->state != MQTT_STATE_WAIT_TIMEOUT) { + ESP_LOGD(TAG, "The client is not waiting for reconnection. Ignore the request"); + return ESP_FAIL; + } + client->wait_timeout_ms = 0; + xEventGroupSetBits(client->status_bits, RECONNECT_BIT); + return ESP_OK; +} + esp_err_t esp_mqtt_client_stop(esp_mqtt_client_handle_t client) { if (client->run) { @@ -816,20 +987,23 @@ int esp_mqtt_client_subscribe(esp_mqtt_client_handle_t client, const char *topic ESP_LOGE(TAG, "Client has not connected"); return -1; } - mqtt_enqueue(client); //move pending msg to outbox (if have) + MQTT_API_LOCK_FROM_OTHER_TASK(client); client->mqtt_state.outbound_message = mqtt_msg_subscribe(&client->mqtt_state.mqtt_connection, topic, qos, &client->mqtt_state.pending_msg_id); client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); client->mqtt_state.pending_msg_count ++; + mqtt_enqueue(client); //move pending msg to outbox (if have) if (mqtt_write_data(client) != ESP_OK) { ESP_LOGE(TAG, "Error to subscribe topic=%s, qos=%d", topic, qos); + MQTT_API_UNLOCK_FROM_OTHER_TASK(client); return -1; } ESP_LOGD(TAG, "Sent subscribe topic=%s, id: %d, type=%d successful", topic, client->mqtt_state.pending_msg_id, client->mqtt_state.pending_msg_type); + MQTT_API_UNLOCK_FROM_OTHER_TASK(client); return client->mqtt_state.pending_msg_id; } @@ -839,7 +1013,7 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top ESP_LOGE(TAG, "Client has not connected"); return -1; } - mqtt_enqueue(client); + MQTT_API_LOCK_FROM_OTHER_TASK(client); client->mqtt_state.outbound_message = mqtt_msg_unsubscribe(&client->mqtt_state.mqtt_connection, topic, &client->mqtt_state.pending_msg_id); @@ -847,27 +1021,28 @@ int esp_mqtt_client_unsubscribe(esp_mqtt_client_handle_t client, const char *top client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); client->mqtt_state.pending_msg_count ++; + mqtt_enqueue(client); if (mqtt_write_data(client) != ESP_OK) { ESP_LOGE(TAG, "Error to unsubscribe topic=%s", topic); + MQTT_API_UNLOCK_FROM_OTHER_TASK(client); return -1; } ESP_LOGD(TAG, "Sent Unsubscribe topic=%s, id: %d, successful", topic, client->mqtt_state.pending_msg_id); + MQTT_API_UNLOCK_FROM_OTHER_TASK(client); return client->mqtt_state.pending_msg_id; } int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain) { uint16_t pending_msg_id = 0; - if (client->state != MQTT_STATE_CONNECTED) { - ESP_LOGE(TAG, "Client has not connected"); - return -1; - } + if (len <= 0) { len = strlen(data); } + MQTT_API_LOCK_FROM_OTHER_TASK(client); mqtt_message_t *publish_msg = mqtt_msg_publish(&client->mqtt_state.mqtt_connection, topic, data, len, qos, retain, @@ -875,20 +1050,82 @@ int esp_mqtt_client_publish(esp_mqtt_client_handle_t client, const char *topic, /* We have to set as pending all the qos>0 messages) */ if (qos > 0) { - mqtt_enqueue(client); client->mqtt_state.outbound_message = publish_msg; client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); client->mqtt_state.pending_msg_id = pending_msg_id; + client->mqtt_state.pending_publish_qos = qos; client->mqtt_state.pending_msg_count ++; + // by default store as QUEUED (not transmitted yet) + mqtt_enqueue(client); } else { client->mqtt_state.outbound_message = publish_msg; } - if (mqtt_write_data(client) != ESP_OK) { - ESP_LOGE(TAG, "Error to public data to topic=%s, qos=%d", topic, qos); - return -1; + /* Skip sending if not connected (rely on resending) */ + if (client->state != MQTT_STATE_CONNECTED) { + ESP_LOGD(TAG, "Publish: client is not connected"); + goto cannot_publish; } + + /* Provide support for sending fragmented message if it doesn't fit buffer */ + int remaining_len = len; + const char *current_data = data; + bool sending = true; + + while (sending) { + + if (mqtt_write_data(client) != ESP_OK) { + esp_mqtt_abort_connection(client); + goto cannot_publish; + } + + int data_sent = client->mqtt_state.outbound_message->length - client->mqtt_state.outbound_message->fragmented_msg_data_offset; + remaining_len -= data_sent; + current_data += data_sent; + + if (remaining_len > 0) { + mqtt_connection_t* connection = &client->mqtt_state.mqtt_connection; + ESP_LOGD(TAG, "Sending fragmented message, remains to send %d bytes of %d", remaining_len, len); + if (connection->message.fragmented_msg_data_offset) { + // asked to enqueue oversized message (first time only) + connection->message.fragmented_msg_data_offset = 0; + connection->message.fragmented_msg_total_length = 0; + if (qos > 0) { + // internally enqueue all big messages, as they dont fit 'pending msg' structure + mqtt_enqueue_oversized(client, (uint8_t*)current_data, remaining_len); + } + } + + if (remaining_len > connection->buffer_length) { + // Continue with sending + memcpy(connection->buffer, current_data, connection->buffer_length); + connection->message.length = connection->buffer_length; + sending = true; + } else { + memcpy(connection->buffer, current_data, remaining_len); + connection->message.length = remaining_len; + sending = true; + } + connection->message.data = connection->buffer; + client->mqtt_state.outbound_message = &connection->message; + } else { + // Message was sent correctly + sending = false; + } + } + + if (qos > 0) { + outbox_set_pending(client->outbox, pending_msg_id, TRANSMITTED); + } + MQTT_API_UNLOCK_FROM_OTHER_TASK(client); return pending_msg_id; + +cannot_publish: + if (qos == 0) { + ESP_LOGW(TAG, "Publish: Loosing qos0 data when client not connected"); + } + MQTT_API_UNLOCK_FROM_OTHER_TASK(client); + return 0; } diff --git a/examples/protocols/esp-mqtt/ssl/CMakeLists.txt b/examples/protocols/esp-mqtt/ssl/CMakeLists.txt index 13bdd8c2..71718100 100644 --- a/examples/protocols/esp-mqtt/ssl/CMakeLists.txt +++ b/examples/protocols/esp-mqtt/ssl/CMakeLists.txt @@ -6,4 +6,4 @@ include($ENV{IDF_PATH}/tools/cmake/project.cmake) project(mqtt_ssl) -target_add_binary_data(mqtt_ssl.elf "main/iot_eclipse_org.pem" TEXT) +target_add_binary_data(mqtt_ssl.elf "main/mqtt_eclipse_org.pem" TEXT) diff --git a/examples/protocols/esp-mqtt/ssl/README.md b/examples/protocols/esp-mqtt/ssl/README.md index 3d369bdf..80b53938 100644 --- a/examples/protocols/esp-mqtt/ssl/README.md +++ b/examples/protocols/esp-mqtt/ssl/README.md @@ -2,7 +2,7 @@ (See the README.md file in the upper level 'examples' directory for more information about examples.) -This example connects to the broker iot.eclipse.org using ssl transport and as a demonstration subscribes/unsubscribes and send a message on certain topic. +This example connects to the broker mqtt.eclipse.org using ssl transport and as a demonstration subscribes/unsubscribes and send a message on certain topic. It uses ESP-MQTT library which implements mqtt client to connect to mqtt broker. @@ -22,9 +22,9 @@ make menuconfig * Set ssid and password for the board to connect to AP. -Note how to create a PEM certificate for iot.eclipse.org: +Note how to create a PEM certificate for mqtt.eclipse.org: ``` -openssl s_client -showcerts -connect iot.eclipse.org:8883 /dev/null|openssl x509 -outform PEM >iot_eclipse_org.pem +openssl s_client -showcerts -connect mqtt.eclipse.org:8883 /dev/null|openssl x509 -outform PEM >iot_eclipse_org.pem ``` ### Build and Flash diff --git a/examples/protocols/esp-mqtt/ssl/main/Kconfig.projbuild b/examples/protocols/esp-mqtt/ssl/main/Kconfig.projbuild index 8b4eda41..7cf18965 100644 --- a/examples/protocols/esp-mqtt/ssl/main/Kconfig.projbuild +++ b/examples/protocols/esp-mqtt/ssl/main/Kconfig.projbuild @@ -1,31 +1,32 @@ menu "Example Configuration" -config WIFI_SSID - string "WiFi SSID" - default "myssid" - help - SSID (network name) for the example to connect to. + config WIFI_SSID + string "WiFi SSID" + default "myssid" + help + SSID (network name) for the example to connect to. -config WIFI_PASSWORD - string "WiFi Password" - default "mypassword" - help - WiFi password (WPA or WPA2) for the example to use. + config WIFI_PASSWORD + string "WiFi Password" + default "mypassword" + help + WiFi password (WPA or WPA2) for the example to use. -config BROKER_URI - string "Broker URL" - default "mqtts://iot.eclipse.org:8883" - help - URL of an mqtt broker which this example connects to. + config BROKER_URI + string "Broker URL" + default "mqtts://mqtt.eclipse.org:8883" + help + URL of an mqtt broker which this example connects to. -config BROKER_CERTIFICATE_OVERRIDE - string "Broker certificate override" - default "" - help - Please leave empty if broker certificate included from a textfile; otherwise fill in a base64 part of PEM format certificate + config BROKER_CERTIFICATE_OVERRIDE + string "Broker certificate override" + default "" + help + Please leave empty if broker certificate included from a textfile; otherwise fill in a base64 part of PEM + format certificate -config BROKER_CERTIFICATE_OVERRIDDEN - bool - default y if BROKER_CERTIFICATE_OVERRIDE != "" + config BROKER_CERTIFICATE_OVERRIDDEN + bool + default y if BROKER_CERTIFICATE_OVERRIDE != "" endmenu diff --git a/examples/protocols/esp-mqtt/ssl/main/app_main.c b/examples/protocols/esp-mqtt/ssl/main/app_main.c index 17818e4e..d53cb930 100644 --- a/examples/protocols/esp-mqtt/ssl/main/app_main.c +++ b/examples/protocols/esp-mqtt/ssl/main/app_main.c @@ -78,11 +78,11 @@ static void wifi_init(void) } #if CONFIG_BROKER_CERTIFICATE_OVERRIDDEN == 1 -static const uint8_t iot_eclipse_org_pem_start[] = "-----BEGIN CERTIFICATE-----\n" CONFIG_BROKER_CERTIFICATE_OVERRIDE "\n-----END CERTIFICATE-----"; +static const uint8_t mqtt_eclipse_org_pem_start[] = "-----BEGIN CERTIFICATE-----\n" CONFIG_BROKER_CERTIFICATE_OVERRIDE "\n-----END CERTIFICATE-----"; #else -extern const uint8_t iot_eclipse_org_pem_start[] asm("_binary_iot_eclipse_org_pem_start"); +extern const uint8_t mqtt_eclipse_org_pem_start[] asm("_binary_mqtt_eclipse_org_pem_start"); #endif -extern const uint8_t iot_eclipse_org_pem_end[] asm("_binary_iot_eclipse_org_pem_end"); +extern const uint8_t mqtt_eclipse_org_pem_end[] asm("_binary_mqtt_eclipse_org_pem_end"); static esp_err_t mqtt_event_handler(esp_mqtt_event_handle_t event) { @@ -123,6 +123,12 @@ static esp_err_t mqtt_event_handler(esp_mqtt_event_handle_t event) break; case MQTT_EVENT_ERROR: ESP_LOGI(TAG, "MQTT_EVENT_ERROR"); + if (event->error_handle->error_type == MQTT_ERROR_TYPE_CONNECTION_REFUSED) { + ESP_LOGI(TAG, "Connection refused error: 0x%x", event->error_handle->connect_return_code); + } + break; + default: + ESP_LOGI(TAG, "Other event id:%d", event->event_id); break; } return ESP_OK; @@ -134,7 +140,7 @@ static void mqtt_app_start(void) const esp_mqtt_client_config_t mqtt_cfg = { .uri = CONFIG_BROKER_URI, .event_handle = mqtt_event_handler, - .cert_pem = (const char *)iot_eclipse_org_pem_start, + .cert_pem = (const char *)mqtt_eclipse_org_pem_start, }; ESP_LOGI(TAG, "[APP] Free memory: %d bytes", esp_get_free_heap_size()); diff --git a/examples/protocols/esp-mqtt/ssl/main/component.mk b/examples/protocols/esp-mqtt/ssl/main/component.mk index 797c4a1f..597752fb 100644 --- a/examples/protocols/esp-mqtt/ssl/main/component.mk +++ b/examples/protocols/esp-mqtt/ssl/main/component.mk @@ -1 +1 @@ -COMPONENT_EMBED_TXTFILES := iot_eclipse_org.pem +COMPONENT_EMBED_TXTFILES := mqtt_eclipse_org.pem diff --git a/examples/protocols/esp-mqtt/ssl/main/iot_eclipse_org.pem b/examples/protocols/esp-mqtt/ssl/main/mqtt_eclipse_org.pem similarity index 100% rename from examples/protocols/esp-mqtt/ssl/main/iot_eclipse_org.pem rename to examples/protocols/esp-mqtt/ssl/main/mqtt_eclipse_org.pem diff --git a/examples/protocols/esp-mqtt/ssl/mqtt_ssl_example_test.py b/examples/protocols/esp-mqtt/ssl/mqtt_ssl_example_test.py index 3ea27c09..ef9d1a9d 100644 --- a/examples/protocols/esp-mqtt/ssl/mqtt_ssl_example_test.py +++ b/examples/protocols/esp-mqtt/ssl/mqtt_ssl_example_test.py @@ -86,8 +86,8 @@ def test_examples_protocol_mqtt_ssl(env, extra_data): client.on_connect = on_connect client.on_message = on_message client.tls_set(None, - None, - None, cert_reqs=ssl.CERT_NONE, tls_version=ssl.PROTOCOL_TLSv1, ciphers=None) + None, + None, cert_reqs=ssl.CERT_NONE, tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None) client.tls_insecure_set(True) print("Connecting...") client.connect(broker_url, broker_port, 60) @@ -117,5 +117,6 @@ def test_examples_protocol_mqtt_ssl(env, extra_data): event_stop_client.set() thread1.join() + if __name__ == '__main__': test_examples_protocol_mqtt_ssl() diff --git a/examples/protocols/esp-mqtt/ssl_mutual_auth/main/Kconfig.projbuild b/examples/protocols/esp-mqtt/ssl_mutual_auth/main/Kconfig.projbuild index 176d8fb3..ed4e6916 100644 --- a/examples/protocols/esp-mqtt/ssl_mutual_auth/main/Kconfig.projbuild +++ b/examples/protocols/esp-mqtt/ssl_mutual_auth/main/Kconfig.projbuild @@ -1,15 +1,15 @@ menu "Example Configuration" -config WIFI_SSID - string "WiFi SSID" - default "myssid" - help - SSID (network name) for the example to connect to. + config WIFI_SSID + string "WiFi SSID" + default "myssid" + help + SSID (network name) for the example to connect to. -config WIFI_PASSWORD - string "WiFi Password" - default "mypassword" - help - WiFi password (WPA or WPA2) for the example to use. + config WIFI_PASSWORD + string "WiFi Password" + default "mypassword" + help + WiFi password (WPA or WPA2) for the example to use. endmenu diff --git a/examples/protocols/esp-mqtt/ssl_mutual_auth/main/app_main.c b/examples/protocols/esp-mqtt/ssl_mutual_auth/main/app_main.c index adf5db9a..cd0117fe 100644 --- a/examples/protocols/esp-mqtt/ssl_mutual_auth/main/app_main.c +++ b/examples/protocols/esp-mqtt/ssl_mutual_auth/main/app_main.c @@ -122,6 +122,9 @@ static esp_err_t mqtt_event_handler(esp_mqtt_event_handle_t event) case MQTT_EVENT_ERROR: ESP_LOGI(TAG, "MQTT_EVENT_ERROR"); break; + default: + ESP_LOGI(TAG, "Other event id:%d", event->event_id); + break; } return ESP_OK; } diff --git a/examples/protocols/esp-mqtt/tcp/main/Kconfig.projbuild b/examples/protocols/esp-mqtt/tcp/main/Kconfig.projbuild index 71f95ea8..57d9b433 100644 --- a/examples/protocols/esp-mqtt/tcp/main/Kconfig.projbuild +++ b/examples/protocols/esp-mqtt/tcp/main/Kconfig.projbuild @@ -1,25 +1,25 @@ menu "Example Configuration" -config WIFI_SSID - string "WiFi SSID" - default "myssid" - help - SSID (network name) for the example to connect to. + config WIFI_SSID + string "WiFi SSID" + default "myssid" + help + SSID (network name) for the example to connect to. -config WIFI_PASSWORD - string "WiFi Password" - default "mypassword" - help - WiFi password (WPA or WPA2) for the example to use. + config WIFI_PASSWORD + string "WiFi Password" + default "mypassword" + help + WiFi password (WPA or WPA2) for the example to use. -config BROKER_URL - string "Broker URL" - default "mqtt://iot.eclipse.org" - help - URL of the broker to connect to + config BROKER_URL + string "Broker URL" + default "mqtt://mqtt.eclipse.org" + help + URL of the broker to connect to -config BROKER_URL_FROM_STDIN - bool - default y if BROKER_URL = "FROM_STDIN" + config BROKER_URL_FROM_STDIN + bool + default y if BROKER_URL = "FROM_STDIN" endmenu diff --git a/examples/protocols/esp-mqtt/tcp/main/app_main.c b/examples/protocols/esp-mqtt/tcp/main/app_main.c index ea37a740..b78e9d82 100644 --- a/examples/protocols/esp-mqtt/tcp/main/app_main.c +++ b/examples/protocols/esp-mqtt/tcp/main/app_main.c @@ -69,6 +69,9 @@ static esp_err_t mqtt_event_handler(esp_mqtt_event_handle_t event) case MQTT_EVENT_ERROR: ESP_LOGI(TAG, "MQTT_EVENT_ERROR"); break; + default: + ESP_LOGI(TAG, "Other event id:%d", event->event_id); + break; } return ESP_OK; } diff --git a/examples/protocols/esp-mqtt/tcp/mqtt_tcp_example_test.py b/examples/protocols/esp-mqtt/tcp/mqtt_tcp_example_test.py index e2568d52..05e51420 100644 --- a/examples/protocols/esp-mqtt/tcp/mqtt_tcp_example_test.py +++ b/examples/protocols/esp-mqtt/tcp/mqtt_tcp_example_test.py @@ -1,36 +1,53 @@ import re import os import sys -from socket import * +import socket from threading import Thread import struct import time -msgid=-1 + +try: + import IDF +except ImportError: + # this is a test case write with tiny-test-fw. + # to run test cases outside tiny-test-fw, + # we need to set environment variable `TEST_FW_PATH`, + # then get and insert `TEST_FW_PATH` to sys path before import FW module + test_fw_path = os.getenv("TEST_FW_PATH") + if test_fw_path and test_fw_path not in sys.path: + sys.path.insert(0, test_fw_path) + import IDF + +import DUT + +msgid = -1 + def get_my_ip(): - s1 = socket(AF_INET, SOCK_DGRAM) + s1 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s1.connect(("8.8.8.8", 80)) my_ip = s1.getsockname()[0] s1.close() return my_ip + def mqqt_server_sketch(my_ip, port): global msgid print("Starting the server on {}".format(my_ip)) s = None try: - s=socket(AF_INET, SOCK_STREAM) + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(60) s.bind((my_ip, port)) s.listen(1) - q,addr=s.accept() + q,addr = s.accept() q.settimeout(30) print("connection accepted") - except: + except Exception: print("Local server on {}:{} listening/accepting failure: {}" - "Possibly check permissions or firewall settings" - "to accept connections on this address".format(my_ip, port, sys.exc_info()[0])) + "Possibly check permissions or firewall settings" + "to accept connections on this address".format(my_ip, port, sys.exc_info()[0])) raise data = q.recv(1024) # check if received initial empty message @@ -47,20 +64,6 @@ def mqqt_server_sketch(my_ip, port): s.close() print("server closed") -# this is a test case write with tiny-test-fw. -# to run test cases outside tiny-test-fw, -# we need to set environment variable `TEST_FW_PATH`, -# then get and insert `TEST_FW_PATH` to sys path before import FW module -test_fw_path = os.getenv("TEST_FW_PATH") -if test_fw_path and test_fw_path not in sys.path: - sys.path.insert(0, test_fw_path) - -import TinyFW -import IDF -import DUT - - - @IDF.idf_example_test(env_tag="Example_WIFI") def test_examples_protocol_mqtt_qos1(env, extra_data): @@ -76,14 +79,14 @@ def test_examples_protocol_mqtt_qos1(env, extra_data): # check and log bin size binary_file = os.path.join(dut1.app.binary_path, "mqtt_tcp.bin") bin_size = os.path.getsize(binary_file) - IDF.log_performance("mqtt_tcp_bin_size", "{}KB".format(bin_size//1024)) - IDF.check_performance("mqtt_tcp_size", bin_size//1024) + IDF.log_performance("mqtt_tcp_bin_size", "{}KB".format(bin_size // 1024)) + IDF.check_performance("mqtt_tcp_size", bin_size // 1024) # 1. start mqtt broker sketch host_ip = get_my_ip() - thread1 = Thread(target = mqqt_server_sketch, args = (host_ip,1883)) + thread1 = Thread(target=mqqt_server_sketch, args=(host_ip,1883)) thread1.start() # 2. start the dut test and wait till client gets IP address - dut1.start_app() + dut1.start_app() # waiting for getting the IP address try: ip_address = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30) @@ -91,10 +94,10 @@ def test_examples_protocol_mqtt_qos1(env, extra_data): except DUT.ExpectTimeout: raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP') - print ("writing to device: {}".format("mqtt://" + host_ip + "\n")) + print("writing to device: {}".format("mqtt://" + host_ip + "\n")) dut1.write("mqtt://" + host_ip + "\n") thread1.join() - print ("Message id received from server: {}".format(msgid)) + print("Message id received from server: {}".format(msgid)) # 3. check the message id was enqueued and then deleted msgid_enqueued = dut1.expect(re.compile(r"OUTBOX: ENQUEUE msgid=([0-9]+)"), timeout=30) msgid_deleted = dut1.expect(re.compile(r"OUTBOX: DELETED msgid=([0-9]+)"), timeout=30) @@ -105,5 +108,6 @@ def test_examples_protocol_mqtt_qos1(env, extra_data): print("Failure!") raise ValueError('Mismatch of msgid: received: {}, enqueued {}, deleted {}'.format(msgid, msgid_enqueued, msgid_deleted)) + if __name__ == '__main__': test_examples_protocol_mqtt_qos1() diff --git a/examples/protocols/esp-mqtt/ws/README.md b/examples/protocols/esp-mqtt/ws/README.md index 619519d9..905d2080 100644 --- a/examples/protocols/esp-mqtt/ws/README.md +++ b/examples/protocols/esp-mqtt/ws/README.md @@ -2,7 +2,7 @@ (See the README.md file in the upper level 'examples' directory for more information about examples.) -This example connects to the broker iot.eclipse.org over web sockets as a demonstration subscribes/unsubscribes and send a message on certain topic. +This example connects to the broker mqtt.eclipse.org over web sockets as a demonstration subscribes/unsubscribes and send a message on certain topic. It uses ESP-MQTT library which implements mqtt client to connect to mqtt broker. diff --git a/examples/protocols/esp-mqtt/ws/main/Kconfig.projbuild b/examples/protocols/esp-mqtt/ws/main/Kconfig.projbuild index 5223e885..ee66ba88 100644 --- a/examples/protocols/esp-mqtt/ws/main/Kconfig.projbuild +++ b/examples/protocols/esp-mqtt/ws/main/Kconfig.projbuild @@ -1,21 +1,21 @@ menu "Example Configuration" -config WIFI_SSID - string "WiFi SSID" - default "myssid" - help - SSID (network name) for the example to connect to. + config WIFI_SSID + string "WiFi SSID" + default "myssid" + help + SSID (network name) for the example to connect to. -config WIFI_PASSWORD - string "WiFi Password" - default "mypassword" - help - WiFi password (WPA or WPA2) for the example to use. + config WIFI_PASSWORD + string "WiFi Password" + default "mypassword" + help + WiFi password (WPA or WPA2) for the example to use. -config BROKER_URI - string "Broker URL" - default "ws://iot.eclipse.org:80/ws" - help - URL of an mqtt broker which this example connects to. + config BROKER_URI + string "Broker URL" + default "ws://mqtt.eclipse.org:80/mqtt" + help + URL of an mqtt broker which this example connects to. endmenu diff --git a/examples/protocols/esp-mqtt/ws/main/app_main.c b/examples/protocols/esp-mqtt/ws/main/app_main.c index 6b1e18c8..8ac59ac7 100644 --- a/examples/protocols/esp-mqtt/ws/main/app_main.c +++ b/examples/protocols/esp-mqtt/ws/main/app_main.c @@ -66,6 +66,9 @@ static esp_err_t mqtt_event_handler(esp_mqtt_event_handle_t event) case MQTT_EVENT_ERROR: ESP_LOGI(TAG, "MQTT_EVENT_ERROR"); break; + default: + ESP_LOGI(TAG, "Other event id:%d", event->event_id); + break; } return ESP_OK; } diff --git a/examples/protocols/esp-mqtt/ws/mqtt_ws_example_test.py b/examples/protocols/esp-mqtt/ws/mqtt_ws_example_test.py index b4919bfb..58b8d5e3 100644 --- a/examples/protocols/esp-mqtt/ws/mqtt_ws_example_test.py +++ b/examples/protocols/esp-mqtt/ws/mqtt_ws_example_test.py @@ -82,7 +82,6 @@ def test_examples_protocol_mqtt_ws(env, extra_data): client = mqtt.Client(transport="websockets") client.on_connect = on_connect client.on_message = on_message - client.ws_set_options(path="/ws", headers=None) print("Connecting...") client.connect(broker_url, broker_port, 60) except Exception: @@ -111,5 +110,6 @@ def test_examples_protocol_mqtt_ws(env, extra_data): event_stop_client.set() thread1.join() + if __name__ == '__main__': test_examples_protocol_mqtt_ws() diff --git a/examples/protocols/esp-mqtt/wss/CMakeLists.txt b/examples/protocols/esp-mqtt/wss/CMakeLists.txt index 7ba5e629..f9492616 100644 --- a/examples/protocols/esp-mqtt/wss/CMakeLists.txt +++ b/examples/protocols/esp-mqtt/wss/CMakeLists.txt @@ -6,4 +6,4 @@ include($ENV{IDF_PATH}/tools/cmake/project.cmake) project(mqtt_websocket_secure) -target_add_binary_data(mqtt_websocket_secure.elf "main/iot_eclipse_org.pem" TEXT) +target_add_binary_data(mqtt_websocket_secure.elf "main/mqtt_eclipse_org.pem" TEXT) diff --git a/examples/protocols/esp-mqtt/wss/README.md b/examples/protocols/esp-mqtt/wss/README.md index 43d829cc..c92500ea 100644 --- a/examples/protocols/esp-mqtt/wss/README.md +++ b/examples/protocols/esp-mqtt/wss/README.md @@ -1,7 +1,7 @@ # ESP-MQTT MQTT over WSS Sample application (See the README.md file in the upper level 'examples' directory for more information about examples.) -This example connects to the broker iot.eclipse.org over secure websockets and as a demonstration subscribes/unsubscribes and send a message on certain topic. +This example connects to the broker mqtt.eclipse.org over secure websockets and as a demonstration subscribes/unsubscribes and send a message on certain topic. It uses ESP-MQTT library which implements mqtt client to connect to mqtt broker. @@ -21,10 +21,10 @@ make menuconfig * Set ssid and password for the board to connect to AP. -Note how to create a PEM certificate for iot.eclipse.org: +Note how to create a PEM certificate for mqtt.eclipse.org: ``` -openssl s_client -showcerts -connect iot.eclipse.org:8883 /dev/null|openssl x509 -outform PEM >iot_eclipse_org.pem +openssl s_client -showcerts -connect mqtt.eclipse.org:8883 /dev/null|openssl x509 -outform PEM >iot_eclipse_org.pem ``` ### Build and Flash diff --git a/examples/protocols/esp-mqtt/wss/main/Kconfig.projbuild b/examples/protocols/esp-mqtt/wss/main/Kconfig.projbuild index 964d436f..a91485b7 100644 --- a/examples/protocols/esp-mqtt/wss/main/Kconfig.projbuild +++ b/examples/protocols/esp-mqtt/wss/main/Kconfig.projbuild @@ -1,31 +1,32 @@ menu "Example Configuration" -config WIFI_SSID - string "WiFi SSID" - default "myssid" - help - SSID (network name) for the example to connect to. + config WIFI_SSID + string "WiFi SSID" + default "myssid" + help + SSID (network name) for the example to connect to. -config WIFI_PASSWORD - string "WiFi Password" - default "mypassword" - help - WiFi password (WPA or WPA2) for the example to use. + config WIFI_PASSWORD + string "WiFi Password" + default "mypassword" + help + WiFi password (WPA or WPA2) for the example to use. -config BROKER_URI - string "Broker URL" - default "wss://iot.eclipse.org:443/ws" - help - URL of an mqtt broker which this example connects to. + config BROKER_URI + string "Broker URL" + default "wss://mqtt.eclipse.org:443/mqtt" + help + URL of an mqtt broker which this example connects to. -config BROKER_CERTIFICATE_OVERRIDE - string "Server certificate override" - default "" - help - Please leave empty if server certificate included from a textfile; otherwise fill in a base64 part of PEM format certificate + config BROKER_CERTIFICATE_OVERRIDE + string "Server certificate override" + default "" + help + Please leave empty if server certificate included from a textfile; otherwise fill in a base64 part of PEM + format certificate -config BROKER_CERTIFICATE_OVERRIDDEN - bool - default y if BROKER_CERTIFICATE_OVERRIDE != "" + config BROKER_CERTIFICATE_OVERRIDDEN + bool + default y if BROKER_CERTIFICATE_OVERRIDE != "" endmenu diff --git a/examples/protocols/esp-mqtt/wss/main/app_main.c b/examples/protocols/esp-mqtt/wss/main/app_main.c index 06f6307d..1db48480 100644 --- a/examples/protocols/esp-mqtt/wss/main/app_main.c +++ b/examples/protocols/esp-mqtt/wss/main/app_main.c @@ -78,11 +78,11 @@ static void wifi_init(void) } #if CONFIG_BROKER_CERTIFICATE_OVERRIDDEN == 1 -static const uint8_t iot_eclipse_org_pem_start[] = "-----BEGIN CERTIFICATE-----\n" CONFIG_BROKER_CERTIFICATE_OVERRIDE "\n-----END CERTIFICATE-----"; +static const uint8_t mqtt_eclipse_org_pem_start[] = "-----BEGIN CERTIFICATE-----\n" CONFIG_BROKER_CERTIFICATE_OVERRIDE "\n-----END CERTIFICATE-----"; #else -extern const uint8_t iot_eclipse_org_pem_start[] asm("_binary_iot_eclipse_org_pem_start"); +extern const uint8_t mqtt_eclipse_org_pem_start[] asm("_binary_mqtt_eclipse_org_pem_start"); #endif -extern const uint8_t iot_eclipse_org_pem_end[] asm("_binary_iot_eclipse_org_pem_end"); +extern const uint8_t mqtt_eclipse_org_pem_end[] asm("_binary_mqtt_eclipse_org_pem_end"); static esp_err_t mqtt_event_handler(esp_mqtt_event_handle_t event) { @@ -124,6 +124,9 @@ static esp_err_t mqtt_event_handler(esp_mqtt_event_handle_t event) case MQTT_EVENT_ERROR: ESP_LOGI(TAG, "MQTT_EVENT_ERROR"); break; + default: + ESP_LOGI(TAG, "Other event id:%d", event->event_id); + break; } return ESP_OK; } @@ -133,7 +136,7 @@ static void mqtt_app_start(void) const esp_mqtt_client_config_t mqtt_cfg = { .uri = CONFIG_BROKER_URI, .event_handle = mqtt_event_handler, - .cert_pem = (const char *)iot_eclipse_org_pem_start, + .cert_pem = (const char *)mqtt_eclipse_org_pem_start, }; ESP_LOGI(TAG, "[APP] Free memory: %d bytes", esp_get_free_heap_size()); diff --git a/examples/protocols/esp-mqtt/wss/main/component.mk b/examples/protocols/esp-mqtt/wss/main/component.mk index 797c4a1f..597752fb 100644 --- a/examples/protocols/esp-mqtt/wss/main/component.mk +++ b/examples/protocols/esp-mqtt/wss/main/component.mk @@ -1 +1 @@ -COMPONENT_EMBED_TXTFILES := iot_eclipse_org.pem +COMPONENT_EMBED_TXTFILES := mqtt_eclipse_org.pem diff --git a/examples/protocols/esp-mqtt/wss/main/iot_eclipse_org.pem b/examples/protocols/esp-mqtt/wss/main/mqtt_eclipse_org.pem similarity index 100% rename from examples/protocols/esp-mqtt/wss/main/iot_eclipse_org.pem rename to examples/protocols/esp-mqtt/wss/main/mqtt_eclipse_org.pem diff --git a/examples/protocols/esp-mqtt/wss/mqtt_wss_example_test.py b/examples/protocols/esp-mqtt/wss/mqtt_wss_example_test.py index 6aac644c..15ca2834 100644 --- a/examples/protocols/esp-mqtt/wss/mqtt_wss_example_test.py +++ b/examples/protocols/esp-mqtt/wss/mqtt_wss_example_test.py @@ -84,8 +84,8 @@ def test_examples_protocol_mqtt_wss(env, extra_data): client.on_connect = on_connect client.on_message = on_message client.tls_set(None, - None, - None, cert_reqs=ssl.CERT_NONE, tls_version=ssl.PROTOCOL_TLSv1, ciphers=None) + None, + None, cert_reqs=ssl.CERT_NONE, tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None) print("Connecting...") client.connect(broker_url, broker_port, 60) except Exception: @@ -114,5 +114,6 @@ def test_examples_protocol_mqtt_wss(env, extra_data): event_stop_client.set() thread1.join() + if __name__ == '__main__': test_examples_protocol_mqtt_wss()