diff --git a/components/mqtt/Kconfig b/components/mqtt/Kconfig new file mode 100644 index 00000000..0b32f64c --- /dev/null +++ b/components/mqtt/Kconfig @@ -0,0 +1,136 @@ +menu "MQTT(Paho)" + +choice MQTT_VERSION + prompt "MQTT version" + default V3_1 + help + Current supported MQTT version. + +config V3_1 + bool "V3.1" +config V3_1_1 + bool "V3.1.1" +endchoice + +config DEFAULT_MQTT_VERSION + int + default 3 if V3_1 + default 4 if V3_1_1 + +config MQTT_CLIENT_ID + string "MQTT client ID" + default "espressif_sample" + help + MQTT client ID for MQTT broker to identify ESP device. + +config MQTT_KEEP_ALIVE + int "MQTT keep-alive(seconds)" + default 30 + help + MQTT keep alive interval, Recommended value: 30s - 60s. + The last MQTT packet timestamp will be recorded, + a PING request will be sent if (current_timestamp - last_mqtt_packet_timestamp) > MQTT_KEEP_ALIVE. + +config MQTT_USERNAME + string "MQTT username" + default "espressif" + help + Username used for logging to MQTT broker. + Generally, you should use a valid MQTT_USERNAME if MQTT broker does not allow an anonymous login. + +config MQTT_PASSWORD + string "MQTT password" + default "admin" + help + Password used for logging to MQTT broker. + Generally, you should use a valid MQTT_PASSWORD if MQTT broker does not allow an anonymous login. + +choice MQTT_SESSION + prompt "MQTT Session" + default CLEAN_SESSION + help + Clean session to start a new session. + If clean-seesion is set, it will discard any previous session and start a new one. + If keep-session is set, it will store session state and the communication can resume. + +config CLEAN_SESSION + bool "Clean Session" +config KEEP_SESSION + bool "Keep Session" +endchoice + +config DEFAULT_MQTT_SESSION + int + default 0 if KEEP_SESSION + default 1 if CLEAN_SESSION + +choice MQTT_SECURITY + prompt "MQTT over TCP/SSL/TLS feature" + default NO_TLS + help + MQTT over TCP/SSL/TLS. + MQTT_SECURITY=0: MQTT over TCP + MQTT_SECURITY=1: MQTT over TLS with no verify + MQTT_SECURITY=2: MQTT over TLS with verify peer + MQTT_SECURITY=3: MQTT over TLS with verify client + +config NO_TLS + bool "TCP" +config TLS_VERIFY_NONE + bool "TLS Verify None" +config TLS_VERIFY_PEER + bool "TLS Verify Peer" +config TLS_VERIFY_CLIENT + bool "TLS Verify Client" +endchoice + +config DEFAULT_MQTT_SECURITY + int + default 0 if NO_TLS + default 1 if TLS_VERIFY_NONE + default 2 if TLS_VERIFY_PEER + default 3 if TLS_VERIFY_CLIENT + +config MQTT_SEND_BUFFER + int "MQTT send buffer" + default 2048 + help + Recommended value: 1460 - 2048. + Buffer used for sending MQTT messages, including MQTT header, MQTT topic, payload and etc. + +config MQTT_RECV_BUFFER + int "MQTT recv buffer" + default 2048 + help + Recommended value: 1460 - 2048. + Buffer used for receiving MQTT messages, including MQTT header, MQTT topic, payload and etc. + +config MQTT_SEND_CYCLE + int "MQTT send cycle(ms)" + default 30000 + help + Recommended value: 30000 - 60000. + MQTT send interval in every cycle. + A MQTT packet should be sent out in MQTT_SEND_CYCLE, + will block for MQTT_SEND_CYCLE if weak network, and return timeout. + +config MQTT_RECV_CYCLE + int "MQTT recv cycle(ms)" + default 0 + help + Recommended value: 0ms - 500ms. + MQTT receive interval in every cycle. + a MQTT packet should be received in MQTT_RECV_CYCLE, + will block for MQTT_RECV_CYCLE if weak network , and return timeout. + +config MQTT_PING_TIMEOUT + int "MQTT ping timeout(ms)" + default 3000 + help + Recommended value: 3000ms - 10000ms. + MQTT ping timeout. + When MQTT_KEEP_ALIVE expired, it will start sending ping request. + If the ESP device does not receive any ping response within MQTT_PING_TIMEOUT, + it will terminate the MQTT connection. + +endmenu \ No newline at end of file diff --git a/components/mqtt/paho/MQTTClient-C/src/FreeRTOS/MQTTFreeRTOS.c b/components/mqtt/paho/MQTTClient-C/src/FreeRTOS/MQTTFreeRTOS.c old mode 100755 new mode 100644 index fd958936..408dd636 --- a/components/mqtt/paho/MQTTClient-C/src/FreeRTOS/MQTTFreeRTOS.c +++ b/components/mqtt/paho/MQTTClient-C/src/FreeRTOS/MQTTFreeRTOS.c @@ -14,7 +14,6 @@ * Allan Stockdill-Mander - initial API and implementation and/or initial documentation * Ian Craggs - convert to FreeRTOS *******************************************************************************/ - #include #include @@ -22,7 +21,7 @@ #include "MQTTFreeRTOS.h" -int ThreadStart(Thread* thread, void (*fn)(void*), void* arg) +int ThreadStart(Thread *thread, void (*fn)(void *), void *arg) { int rc = 0; uint16_t usTaskStackSize = (configMINIMAL_STACK_SIZE * 5); @@ -39,63 +38,60 @@ int ThreadStart(Thread* thread, void (*fn)(void*), void* arg) } -void MutexInit(Mutex* mutex) +void MutexInit(Mutex *mutex) { mutex->sem = xSemaphoreCreateMutex(); } -int MutexLock(Mutex* mutex) +int MutexLock(Mutex *mutex) { return xSemaphoreTake(mutex->sem, portMAX_DELAY); } -int MutexUnlock(Mutex* mutex) +int MutexUnlock(Mutex *mutex) { - int x = xSemaphoreGive(mutex->sem); - taskYIELD(); - return x; + return xSemaphoreGive(mutex->sem); } -void TimerCountdownMS(Timer* timer, unsigned int timeout_ms) +void TimerCountdownMS(Timer *timer, unsigned int timeout_ms) { timer->xTicksToWait = timeout_ms / portTICK_PERIOD_MS; /* convert milliseconds to ticks */ vTaskSetTimeOutState(&timer->xTimeOut); /* Record the time at which this function was entered. */ } -void TimerCountdown(Timer* timer, unsigned int timeout) +void TimerCountdown(Timer *timer, unsigned int timeout) { TimerCountdownMS(timer, timeout * 1000); } -int TimerLeftMS(Timer* timer) +int TimerLeftMS(Timer *timer) { xTaskCheckForTimeOut(&timer->xTimeOut, &timer->xTicksToWait); /* updates xTicksToWait to the number left */ - return timer->xTicksToWait * portTICK_PERIOD_MS; + return (timer->xTicksToWait < 0) ? 0 : (timer->xTicksToWait * portTICK_PERIOD_MS); } -char TimerIsExpired(Timer* timer) +char TimerIsExpired(Timer *timer) { return xTaskCheckForTimeOut(&timer->xTimeOut, &timer->xTicksToWait) == pdTRUE; } -void TimerInit(Timer* timer) +void TimerInit(Timer *timer) { timer->xTicksToWait = 0; memset(&timer->xTimeOut, '\0', sizeof(timer->xTimeOut)); } -static int esp_read(Network* n, unsigned char* buffer, unsigned int len, unsigned int timeout_ms) +static int esp_read(Network *n, unsigned char *buffer, unsigned int len, unsigned int timeout_ms) { portTickType xTicksToWait = timeout_ms / portTICK_RATE_MS; /* convert milliseconds to ticks */ xTimeOutType xTimeOut; - int recvLen = 0; - int rc = 0; + int recvLen = 0, rc = 0, ret = 0; struct timeval timeout; fd_set fdset; @@ -108,32 +104,36 @@ static int esp_read(Network* n, unsigned char* buffer, unsigned int len, unsigne vTaskSetTimeOutState(&xTimeOut); /* Record the time at which this function was entered. */ - if (select(n->my_socket + 1, &fdset, NULL, NULL, &timeout) > 0) { - if (FD_ISSET(n->my_socket, &fdset)) { - do { - rc = recv(n->my_socket, buffer + recvLen, len - recvLen, MSG_DONTWAIT); + ret = select(n->my_socket + 1, &fdset, NULL, NULL, &timeout); - if (rc > 0) { - recvLen += rc; - } else if (rc < 0) { - recvLen = rc; - break; - } - } while (recvLen < len && xTaskCheckForTimeOut(&xTimeOut, &xTicksToWait) == pdFALSE); - } + if (ret <= 0) { + // ret == 0: timeout + // ret < 0: socket err + return ret; + } + + if (FD_ISSET(n->my_socket, &fdset)) { + do { + rc = recv(n->my_socket, buffer + recvLen, len - recvLen, MSG_DONTWAIT); + + if (rc > 0) { + recvLen += rc; + } else if (rc < 0) { + recvLen = rc; + break; + } + } while (recvLen < len && xTaskCheckForTimeOut(&xTimeOut, &xTicksToWait) == pdFALSE); } return recvLen; } -static int esp_write(Network* n, unsigned char* buffer, unsigned int len, unsigned int timeout_ms) +static int esp_write(Network *n, unsigned char *buffer, unsigned int len, unsigned int timeout_ms) { portTickType xTicksToWait = timeout_ms / portTICK_RATE_MS; /* convert milliseconds to ticks */ xTimeOutType xTimeOut; - int sentLen = 0; - int rc = 0; - int readysock; + int sentLen = 0, rc = 0, ret = 0; struct timeval timeout; fd_set fdset; @@ -143,12 +143,15 @@ static int esp_write(Network* n, unsigned char* buffer, unsigned int len, unsign timeout.tv_sec = 0; timeout.tv_usec = timeout_ms * 1000; - vTaskSetTimeOutState(&xTimeOut); /* Record the time at which this function was entered. */ - do { - readysock = select(n->my_socket + 1, NULL, &fdset, NULL, &timeout); - } while (readysock <= 0); + ret = select(n->my_socket + 1, NULL, &fdset, NULL, &timeout); + + if (ret <= 0) { + // ret == 0: timeout + // ret < 0: socket err + return ret; + } if (FD_ISSET(n->my_socket, &fdset)) { do { @@ -167,13 +170,13 @@ static int esp_write(Network* n, unsigned char* buffer, unsigned int len, unsign } -static void esp_disconnect(Network* n) +static void esp_disconnect(Network *n) { close(n->my_socket); } -void NetworkInit(Network* n) +void NetworkInit(Network *n) { n->my_socket = 0; n->mqttread = esp_read; @@ -182,25 +185,25 @@ void NetworkInit(Network* n) } -int NetworkConnect(Network* n, char* addr, int port) +int NetworkConnect(Network *n, char *addr, int port) { struct sockaddr_in sAddr; int retVal = -1; - struct hostent* ipAddress; + struct hostent *ipAddress; if ((ipAddress = gethostbyname(addr)) == 0) { goto exit; } sAddr.sin_family = AF_INET; - sAddr.sin_addr.s_addr = ((struct in_addr*)(ipAddress->h_addr))->s_addr; + sAddr.sin_addr.s_addr = ((struct in_addr *)(ipAddress->h_addr))->s_addr; sAddr.sin_port = htons(port); if ((n->my_socket = socket(AF_INET, SOCK_STREAM, 0)) < 0) { goto exit; } - if ((retVal = connect(n->my_socket, (struct sockaddr*)&sAddr, sizeof(sAddr))) < 0) { + if ((retVal = connect(n->my_socket, (struct sockaddr *)&sAddr, sizeof(sAddr))) < 0) { close(n->my_socket); goto exit; } @@ -210,13 +213,13 @@ exit: } #ifdef CONFIG_SSL_USING_MBEDTLS -static int esp_ssl_read(Network* n, unsigned char* buffer, unsigned int len, unsigned int timeout_ms) +static int esp_ssl_read(Network *n, unsigned char *buffer, unsigned int len, unsigned int timeout_ms) { portTickType xTicksToWait = timeout_ms / portTICK_RATE_MS; /* convert milliseconds to ticks */ xTimeOutType xTimeOut; int recvLen = 0; int rc = 0; - static unsigned char* read_buffer; + static unsigned char *read_buffer; struct timeval timeout; fd_set readset; @@ -297,7 +300,7 @@ static int esp_ssl_read(Network* n, unsigned char* buffer, unsigned int len, uns } -static int esp_ssl_write(Network* n, unsigned char* buffer, unsigned int len, unsigned int timeout_ms) +static int esp_ssl_write(Network *n, unsigned char *buffer, unsigned int len, unsigned int timeout_ms) { portTickType xTicksToWait = timeout_ms / portTICK_RATE_MS; /* convert milliseconds to ticks */ xTimeOutType xTimeOut; @@ -342,7 +345,7 @@ static int esp_ssl_write(Network* n, unsigned char* buffer, unsigned int len, un } -static void esp_ssl_disconnect(Network* n) +static void esp_ssl_disconnect(Network *n) { close(n->my_socket); SSL_free(n->ssl); @@ -351,7 +354,7 @@ static void esp_ssl_disconnect(Network* n) } -void NetworkInitSSL(Network* n) +void NetworkInitSSL(Network *n) { n->my_socket = 0; n->mqttread = esp_ssl_read; @@ -363,11 +366,11 @@ void NetworkInitSSL(Network* n) } -int NetworkConnectSSL(Network* n, char* addr, int port, ssl_ca_crt_key_t* ssl_cck, const SSL_METHOD* method, int verify_mode, size_t frag_len) +int NetworkConnectSSL(Network *n, char *addr, int port, ssl_ca_crt_key_t *ssl_cck, const SSL_METHOD *method, int verify_mode, size_t frag_len) { struct sockaddr_in sAddr; int retVal = -1; - struct hostent* ipAddress; + struct hostent *ipAddress; if ((ipAddress = gethostbyname(addr)) == 0) { goto exit; @@ -408,14 +411,14 @@ int NetworkConnectSSL(Network* n, char* addr, int port, ssl_ca_crt_key_t* ssl_cc } sAddr.sin_family = AF_INET; - sAddr.sin_addr.s_addr = ((struct in_addr*)(ipAddress->h_addr))->s_addr; + sAddr.sin_addr.s_addr = ((struct in_addr *)(ipAddress->h_addr))->s_addr; sAddr.sin_port = htons(port); if ((n->my_socket = socket(AF_INET, SOCK_STREAM, 0)) < 0) { goto exit1; } - if ((retVal = connect(n->my_socket, (struct sockaddr*)&sAddr, sizeof(sAddr))) < 0) { + if ((retVal = connect(n->my_socket, (struct sockaddr *)&sAddr, sizeof(sAddr))) < 0) { goto exit2; } diff --git a/components/mqtt/paho/MQTTClient-C/src/FreeRTOS/MQTTFreeRTOS.h b/components/mqtt/paho/MQTTClient-C/src/FreeRTOS/MQTTFreeRTOS.h old mode 100755 new mode 100644 index 415c786d..0e48f944 --- a/components/mqtt/paho/MQTTClient-C/src/FreeRTOS/MQTTFreeRTOS.h +++ b/components/mqtt/paho/MQTTClient-C/src/FreeRTOS/MQTTFreeRTOS.h @@ -14,7 +14,7 @@ * Allan Stockdill-Mander - initial API and implementation and/or initial documentation *******************************************************************************/ -#if !defined(MQTTFreeRTOS_H) +#ifndef MQTTFreeRTOS_H #define MQTTFreeRTOS_H #include "freertos/FreeRTOS.h" @@ -25,49 +25,50 @@ #include "openssl/ssl.h" #endif -typedef struct Timer -{ +#define OVER_TCP 0 // 0: MQTT over TCP +#define TLS_VERIFY_NONE 1 // 1: enable SSL/TLS, but there is no a certificate verify +#define TLS_VERIFY_PEER 2 // 2: enable SSL/TLS, and verify the MQTT broker certificate +#define TLS_VERIFY_CLIENT 3 // 3: enable SSL/TLS, and verify the MQTT broker certificate, and enable certificate for MQTT broker + +typedef struct Timer { TickType_t xTicksToWait; TimeOut_t xTimeOut; } Timer; typedef struct Network Network; -struct Network -{ +struct Network { int my_socket; - int (*mqttread)(Network*, unsigned char*, unsigned int, unsigned int); - int (*mqttwrite)(Network*, unsigned char*, unsigned int, unsigned int); - void (*disconnect)(Network*); + int (*mqttread)(Network *, unsigned char *, unsigned int, unsigned int); + int (*mqttwrite)(Network *, unsigned char *, unsigned int, unsigned int); + void (*disconnect)(Network *); int read_count; #ifdef CONFIG_SSL_USING_MBEDTLS - SSL_CTX* ctx; - SSL* ssl; + SSL_CTX *ctx; + SSL *ssl; #endif }; -void TimerInit(Timer*); -char TimerIsExpired(Timer*); -void TimerCountdownMS(Timer*, unsigned int); -void TimerCountdown(Timer*, unsigned int); -int TimerLeftMS(Timer*); +void TimerInit(Timer *); +char TimerIsExpired(Timer *); +void TimerCountdownMS(Timer *, unsigned int); +void TimerCountdown(Timer *, unsigned int); +int TimerLeftMS(Timer *); -typedef struct Mutex -{ +typedef struct Mutex { SemaphoreHandle_t sem; } Mutex; -void MutexInit(Mutex*); -int MutexLock(Mutex*); -int MutexUnlock(Mutex*); +void MutexInit(Mutex *); +int MutexLock(Mutex *); +int MutexUnlock(Mutex *); -typedef struct Thread -{ +typedef struct Thread { TaskHandle_t task; } Thread; -int ThreadStart(Thread*, void (*fn)(void*), void* arg); +int ThreadStart(Thread *, void (*fn)(void *), void *arg); /** * @brief Initialize the network structure @@ -76,7 +77,7 @@ int ThreadStart(Thread*, void (*fn)(void*), void* arg); * * @return void */ -void NetworkInit(Network*); +void NetworkInit(Network *); /** * @brief connect with mqtt broker @@ -87,15 +88,15 @@ void NetworkInit(Network*); * * @return connect status */ -int NetworkConnect(Network* n, char* addr, int port); +int NetworkConnect(Network *n, char *addr, int port); #ifdef CONFIG_SSL_USING_MBEDTLS typedef struct ssl_ca_crt_key { - unsigned char* cacrt; + unsigned char *cacrt; unsigned int cacrt_len; - unsigned char* cert; + unsigned char *cert; unsigned int cert_len; - unsigned char* key; + unsigned char *key; unsigned int key_len; } ssl_ca_crt_key_t; @@ -106,7 +107,7 @@ typedef struct ssl_ca_crt_key { * * @return void */ -void NetworkInitSSL(Network* n); +void NetworkInitSSL(Network *n); /** * @brief Use SSL to connect with mqtt broker @@ -121,7 +122,7 @@ void NetworkInitSSL(Network* n); * * @return connect status */ -int NetworkConnectSSL(Network* n, char* addr, int port, ssl_ca_crt_key_t* ssl_cck, const SSL_METHOD* method, int verify_mode, unsigned int frag_len); +int NetworkConnectSSL(Network *n, char *addr, int port, ssl_ca_crt_key_t *ssl_cck, const SSL_METHOD *method, int verify_mode, unsigned int frag_len); /*int NetworkConnectTLS(Network*, char*, int, SlSockSecureFiles_t*, unsigned char, unsigned int, char);*/ diff --git a/components/mqtt/paho/MQTTClient-C/src/MQTTClient.c b/components/mqtt/paho/MQTTClient-C/src/MQTTClient.c old mode 100755 new mode 100644 index 4c837eed..ba251d37 --- a/components/mqtt/paho/MQTTClient-C/src/MQTTClient.c +++ b/components/mqtt/paho/MQTTClient-C/src/MQTTClient.c @@ -15,58 +15,93 @@ * Ian Craggs - fix for #96 - check rem_len in readPacket * Ian Craggs - add ability to set message handler separately #6 *******************************************************************************/ -#include "MQTTClient.h" - #include #include +#include "esp_log.h" +#include "MQTTClient.h" -static void NewMessageData(MessageData* md, MQTTString* aTopicName, MQTTMessage* aMessage) { +static const char *TAG = "mc"; + +static void NewMessageData(MessageData *md, MQTTString *aTopicName, MQTTMessage *aMessage) +{ md->topicName = aTopicName; md->message = aMessage; } -static int getNextPacketId(MQTTClient *c) { +static int getNextPacketId(MQTTClient *c) +{ return c->next_packetid = (c->next_packetid == MAX_PACKET_ID) ? 1 : c->next_packetid + 1; } -static int sendPacket(MQTTClient* c, int length, Timer* timer) +static int sendPacket(MQTTClient *c, int length, Timer *timer) { int rc = FAILURE, sent = 0; - while (sent < length && !TimerIsExpired(timer)) - { + while (sent < length && !TimerIsExpired(timer)) { rc = c->ipstack->mqttwrite(c->ipstack, &c->buf[sent], length, TimerLeftMS(timer)); - if (rc < 0) // there was an error writing the data + + if (rc < 0) { // there was an error writing the data break; + } + sent += rc; } - if (sent == length) - { + + if (sent == length) { TimerCountdown(&c->last_sent, c->keepAliveInterval); // record the fact that we have successfully sent the packet rc = SUCCESS; - } - else + } else { rc = FAILURE; + } + return rc; } - -void MQTTClientInit(MQTTClient* c, Network* network, unsigned int command_timeout_ms, - unsigned char* sendbuf, size_t sendbuf_size, unsigned char* readbuf, size_t readbuf_size) +bool MQTTClientInit(MQTTClient *c, Network *network, unsigned int command_timeout_ms, + unsigned char *sendbuf, size_t sendbuf_size, unsigned char *readbuf, size_t readbuf_size) { int i; c->ipstack = network; - for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) + for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) { c->messageHandlers[i].topicFilter = 0; - c->command_timeout_ms = command_timeout_ms; - c->buf = sendbuf; - c->buf_size = sendbuf_size; - c->readbuf = readbuf; - c->readbuf_size = readbuf_size; + } + + if (command_timeout_ms != 0) { + c->command_timeout_ms = command_timeout_ms; + } else { + c->command_timeout_ms = CONFIG_MQTT_SEND_CYCLE; + } + + if (sendbuf) { + c->buf = sendbuf; + c->buf_size = sendbuf_size; + } else { + c->buf = (unsigned char *)malloc(CONFIG_MQTT_SEND_BUFFER); + + if (c->buf) { + c->buf_size = CONFIG_MQTT_SEND_BUFFER; + } else { + return false; + } + } + + if (readbuf) { + c->readbuf = readbuf; + c->readbuf_size = readbuf_size; + } else { + c->readbuf = (unsigned char *)malloc(CONFIG_MQTT_RECV_BUFFER); + + if (c->readbuf) { + c->readbuf_size = CONFIG_MQTT_RECV_BUFFER; + } else { + return false; + } + } + c->isconnected = 0; c->cleansession = 0; c->ping_outstanding = 0; @@ -74,13 +109,15 @@ void MQTTClientInit(MQTTClient* c, Network* network, unsigned int command_timeou c->next_packetid = 1; TimerInit(&c->last_sent); TimerInit(&c->last_received); + TimerInit(&c->ping_wait); #if defined(MQTT_TASK) MutexInit(&c->mutex); #endif + return true; } -static int decodePacket(MQTTClient* c, int* value, int timeout) +static int decodePacket(MQTTClient *c, int *value, int timeout) { unsigned char i; int multiplier = 1; @@ -88,27 +125,31 @@ static int decodePacket(MQTTClient* c, int* value, int timeout) const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4; *value = 0; - do - { + + do { int rc = MQTTPACKET_READ_ERROR; - if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) - { + if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) { rc = MQTTPACKET_READ_ERROR; /* bad data */ goto exit; } + rc = c->ipstack->mqttread(c->ipstack, &i, 1, timeout); - if (rc != 1) + + if (rc != 1) { goto exit; + } + *value += (i & 127) * multiplier; multiplier *= 128; } while ((i & 128) != 0); + exit: return len; } -static int readPacket(MQTTClient* c, Timer* timer) +static int readPacket(MQTTClient *c, Timer *timer) { MQTTHeader header = {0}; int len = 0; @@ -116,16 +157,17 @@ static int readPacket(MQTTClient* c, Timer* timer) /* 1. read the header byte. This has the packet type in it */ int rc = c->ipstack->mqttread(c->ipstack, c->readbuf, 1, TimerLeftMS(timer)); - if (rc != 1) + + if (rc != 1) { goto exit; + } len = 1; /* 2. read the remaining length. This is variable in itself */ decodePacket(c, &rem_len, TimerLeftMS(timer)); len += MQTTPacket_encode(c->readbuf + 1, rem_len); /* put the original remaining length back into the buffer */ - if (rem_len > (c->readbuf_size - len)) - { + if (rem_len > (c->readbuf_size - len)) { rc = BUFFER_OVERFLOW; goto exit; } @@ -138,8 +180,11 @@ static int readPacket(MQTTClient* c, Timer* timer) header.byte = c->readbuf[0]; rc = header.bits.type; - if (c->keepAliveInterval > 0) - TimerCountdown(&c->last_received, c->keepAliveInterval); // record the fact that we have successfully received a packet + + if (c->keepAliveInterval > 0) { + TimerCountdown(&c->last_received, c->keepAliveInterval); // record the fact that we have successfully received a packet + } + exit: return rc; } @@ -148,26 +193,32 @@ exit: // assume topic filter and name is in correct format // # can only be at end // + and # can only be next to separator -static char isTopicMatched(char* topicFilter, MQTTString* topicName) +static char isTopicMatched(char *topicFilter, MQTTString *topicName) { - char* curf = topicFilter; - char* curn = topicName->lenstring.data; - char* curn_end = curn + topicName->lenstring.len; + char *curf = topicFilter; + char *curn = topicName->lenstring.data; + char *curn_end = curn + topicName->lenstring.len; - while (*curf && curn < curn_end) - { - if (*curn == '/' && *curf != '/') + while (*curf && curn < curn_end) { + if (*curn == '/' && *curf != '/') { break; - if (*curf != '+' && *curf != '#' && *curf != *curn) - break; - if (*curf == '+') - { // skip until we meet the next separator, or end of string - char* nextpos = curn + 1; - while (nextpos < curn_end && *nextpos != '/') - nextpos = ++curn + 1; } - else if (*curf == '#') + + if (*curf != '+' && *curf != '#' && *curf != *curn) { + break; + } + + if (*curf == '+') { + // skip until we meet the next separator, or end of string + char *nextpos = curn + 1; + + while (nextpos < curn_end && *nextpos != '/') { + nextpos = ++curn + 1; + } + } else if (*curf == '#') { curn = curn_end - 1; // skip until end of string + } + curf++; curn++; }; @@ -176,19 +227,16 @@ static char isTopicMatched(char* topicFilter, MQTTString* topicName) } -int deliverMessage(MQTTClient* c, MQTTString* topicName, MQTTMessage* message) +int deliverMessage(MQTTClient *c, MQTTString *topicName, MQTTMessage *message) { int i; int rc = FAILURE; // we have to find the right message handler - indexed by topic - for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) - { - if (c->messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(topicName, (char*)c->messageHandlers[i].topicFilter) || - isTopicMatched((char*)c->messageHandlers[i].topicFilter, topicName))) - { - if (c->messageHandlers[i].fp != NULL) - { + for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) { + if (c->messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(topicName, (char *)c->messageHandlers[i].topicFilter) || + isTopicMatched((char *)c->messageHandlers[i].topicFilter, topicName))) { + if (c->messageHandlers[i].fp != NULL) { MessageData md; NewMessageData(&md, topicName, message); c->messageHandlers[i].fp(&md); @@ -197,8 +245,7 @@ int deliverMessage(MQTTClient* c, MQTTString* topicName, MQTTMessage* message) } } - if (rc == FAILURE && c->defaultMessageHandler != NULL) - { + if (rc == FAILURE && c->defaultMessageHandler != NULL) { MessageData md; NewMessageData(&md, topicName, message); c->defaultMessageHandler(&md); @@ -209,25 +256,27 @@ int deliverMessage(MQTTClient* c, MQTTString* topicName, MQTTMessage* message) } -int keepalive(MQTTClient* c) +int keepalive(MQTTClient *c) { int rc = SUCCESS; - if (c->keepAliveInterval == 0) + if (c->keepAliveInterval == 0) { goto exit; + } - if (TimerIsExpired(&c->last_sent) || TimerIsExpired(&c->last_received)) - { - if (c->ping_outstanding) + if (TimerIsExpired(&c->last_sent) || TimerIsExpired(&c->last_received)) { + if (c->ping_outstanding && TimerIsExpired(&c->ping_wait)) { rc = FAILURE; /* PINGRESP not received in keepalive interval */ - else - { + } else { Timer timer; TimerInit(&timer); TimerCountdownMS(&timer, 1000); int len = MQTTSerialize_pingreq(c->buf, c->buf_size); - if (len > 0 && (rc = sendPacket(c, len, &timer)) == SUCCESS) // send the ping packet + + if (len > 0 && (rc = sendPacket(c, len, &timer)) == SUCCESS) { // send the ping packet c->ping_outstanding = 1; + TimerCountdownMS(&c->ping_wait, CONFIG_MQTT_PING_TIMEOUT); + } } } @@ -236,92 +285,112 @@ exit: } -void MQTTCleanSession(MQTTClient* c) +void MQTTCleanSession(MQTTClient *c) { int i = 0; - for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) + for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) { c->messageHandlers[i].topicFilter = NULL; + } } -void MQTTCloseSession(MQTTClient* c) +void MQTTCloseSession(MQTTClient *c) { + ESP_LOGW(TAG, "mqtt close session"); c->ping_outstanding = 0; c->isconnected = 0; - if (c->cleansession) + + if (c->cleansession) { MQTTCleanSession(c); + } } -int cycle(MQTTClient* c, Timer* timer) +int cycle(MQTTClient *c, Timer *timer) { int len = 0, rc = SUCCESS; int packet_type = readPacket(c, timer); /* read the socket, see what work is due */ - switch (packet_type) - { - default: - /* no more data to read, unrecoverable. Or read packet fails due to unexpected network error */ - rc = packet_type; + switch (packet_type) { + default: + /* no more data to read, unrecoverable. Or read packet fails due to unexpected network error */ + rc = packet_type; + goto exit; + + case 0: /* timed out reading packet */ + break; + + case CONNACK: + case PUBACK: + case SUBACK: + case UNSUBACK: + break; + + case PUBLISH: { + MQTTString topicName; + MQTTMessage msg; + int intQoS; + msg.payloadlen = 0; /* this is a size_t, but deserialize publish sets this as int */ + + if (MQTTDeserialize_publish(&msg.dup, &intQoS, &msg.retained, &msg.id, &topicName, + (unsigned char **)&msg.payload, (int *)&msg.payloadlen, c->readbuf, c->readbuf_size) != 1) { goto exit; - case 0: /* timed out reading packet */ - break; - case CONNACK: - case PUBACK: - case SUBACK: - case UNSUBACK: - break; - case PUBLISH: - { - MQTTString topicName; - MQTTMessage msg; - int intQoS; - msg.payloadlen = 0; /* this is a size_t, but deserialize publish sets this as int */ - if (MQTTDeserialize_publish(&msg.dup, &intQoS, &msg.retained, &msg.id, &topicName, - (unsigned char**)&msg.payload, (int*)&msg.payloadlen, c->readbuf, c->readbuf_size) != 1) - goto exit; - msg.qos = (enum QoS)intQoS; - deliverMessage(c, &topicName, &msg); - if (msg.qos != QOS0) - { - if (msg.qos == QOS1) - len = MQTTSerialize_ack(c->buf, c->buf_size, PUBACK, 0, msg.id); - else if (msg.qos == QOS2) - len = MQTTSerialize_ack(c->buf, c->buf_size, PUBREC, 0, msg.id); - if (len <= 0) - rc = FAILURE; - else - rc = sendPacket(c, len, timer); - if (rc == FAILURE) - goto exit; // there was a problem - } - break; - } - case PUBREC: - case PUBREL: - { - unsigned short mypacketid; - unsigned char dup, type; - if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1) - rc = FAILURE; - else if ((len = MQTTSerialize_ack(c->buf, c->buf_size, - (packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) <= 0) - rc = FAILURE; - else if ((rc = sendPacket(c, len, timer)) != SUCCESS) // send the PUBREL packet - rc = FAILURE; // there was a problem - if (rc == FAILURE) - goto exit; // there was a problem - break; } - case PUBCOMP: - break; - case PINGRESP: - c->ping_outstanding = 0; - break; + msg.qos = (enum QoS)intQoS; + deliverMessage(c, &topicName, &msg); + + if (msg.qos != QOS0) { + if (msg.qos == QOS1) { + len = MQTTSerialize_ack(c->buf, c->buf_size, PUBACK, 0, msg.id); + } else if (msg.qos == QOS2) { + len = MQTTSerialize_ack(c->buf, c->buf_size, PUBREC, 0, msg.id); + } + + if (len <= 0) { + rc = FAILURE; + } else { + rc = sendPacket(c, len, timer); + } + + if (rc == FAILURE) { + goto exit; // there was a problem + } + } + + break; + } + + case PUBREC: + case PUBREL: { + unsigned short mypacketid; + unsigned char dup, type; + + if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1) { + rc = FAILURE; + } else if ((len = MQTTSerialize_ack(c->buf, c->buf_size, + (packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) <= 0) { + rc = FAILURE; + } else if ((rc = sendPacket(c, len, timer)) != SUCCESS) { // send the PUBREL packet + rc = FAILURE; // there was a problem + } + + if (rc == FAILURE) { + goto exit; // there was a problem + } + + break; + } + + case PUBCOMP: + break; + + case PINGRESP: + c->ping_outstanding = 0; + break; } if (keepalive(c) != SUCCESS) { @@ -330,15 +399,18 @@ int cycle(MQTTClient* c, Timer* timer) } exit: - if (rc == SUCCESS) + + if (rc == SUCCESS) { rc = packet_type; - else if (c->isconnected) + } else if (c->isconnected) { MQTTCloseSession(c); + } + return rc; } -int MQTTYield(MQTTClient* c, int timeout_ms) +int MQTTYield(MQTTClient *c, int timeout_ms) { int rc = SUCCESS; Timer timer; @@ -346,10 +418,8 @@ int MQTTYield(MQTTClient* c, int timeout_ms) TimerInit(&timer); TimerCountdownMS(&timer, timeout_ms); - do - { - if (cycle(c, &timer) < 0) - { + do { + if (cycle(c, &timer) < 0) { rc = FAILURE; break; } @@ -358,20 +428,30 @@ int MQTTYield(MQTTClient* c, int timeout_ms) return rc; } -void MQTTRun(void* parm) +void MQTTRun(void *parm) { Timer timer; - MQTTClient* c = (MQTTClient*)parm; + MQTTClient *c = (MQTTClient *)parm; TimerInit(&timer); - while (1) - { - TimerCountdownMS(&timer, 500); /* Don't wait too long if no traffic is incoming */ + while (1) { + TimerCountdownMS(&timer, CONFIG_MQTT_RECV_CYCLE); /* Don't wait too long if no traffic is incoming */ + #if defined(MQTT_TASK) MutexLock(&c->mutex); #endif - cycle(c, &timer); + + int rc = cycle(c, &timer); + + if (rc == FAILURE) { + ESP_LOGE(TAG, "MQTTRun cycle failed"); +#if defined(MQTT_TASK) + MutexUnlock(&c->mutex); +#endif + vTaskDelete(NULL); + } + #if defined(MQTT_TASK) MutexUnlock(&c->mutex); #endif @@ -380,32 +460,29 @@ void MQTTRun(void* parm) #if defined(MQTT_TASK) -int MQTTStartTask(MQTTClient* client) +int MQTTStartTask(MQTTClient *client) { return ThreadStart(&client->thread, &MQTTRun, client); } #endif -int waitfor(MQTTClient* c, int packet_type, Timer* timer) +int waitfor(MQTTClient *c, int packet_type, Timer *timer) { int rc = FAILURE; - do - { - if (TimerIsExpired(timer)) - break; // we timed out + do { + if (TimerIsExpired(timer)) { + break; // we timed out + } + rc = cycle(c, timer); - } - while (rc != packet_type && rc >= 0); + } while (rc != packet_type && rc >= 0); return rc; } - - - -int MQTTConnectWithResults(MQTTClient* c, MQTTPacket_connectData* options, MQTTConnackData* data) +int MQTTConnectWithResults(MQTTClient *c, MQTTPacket_connectData *options, MQTTConnackData *data) { Timer connect_timer; int rc = FAILURE; @@ -415,39 +492,47 @@ int MQTTConnectWithResults(MQTTClient* c, MQTTPacket_connectData* options, MQTTC #if defined(MQTT_TASK) MutexLock(&c->mutex); #endif - if (c->isconnected) /* don't send connect packet again if we are already connected */ + + if (c->isconnected) { /* don't send connect packet again if we are already connected */ goto exit; + } TimerInit(&connect_timer); TimerCountdownMS(&connect_timer, c->command_timeout_ms); - if (options == 0) - options = &default_options; /* set default options if none were supplied */ + if (options == 0) { + options = &default_options; /* set default options if none were supplied */ + } c->keepAliveInterval = options->keepAliveInterval; c->cleansession = options->cleansession; TimerCountdown(&c->last_received, c->keepAliveInterval); - if ((len = MQTTSerialize_connect(c->buf, c->buf_size, options)) <= 0) + + if ((len = MQTTSerialize_connect(c->buf, c->buf_size, options)) <= 0) { goto exit; - if ((rc = sendPacket(c, len, &connect_timer)) != SUCCESS) // send the connect packet - goto exit; // there was a problem + } + + if ((rc = sendPacket(c, len, &connect_timer)) != SUCCESS) { // send the connect packet + goto exit; // there was a problem + } // this will be a blocking call, wait for the connack - if (waitfor(c, CONNACK, &connect_timer) == CONNACK) - { + if (waitfor(c, CONNACK, &connect_timer) == CONNACK) { data->rc = 0; data->sessionPresent = 0; - if (MQTTDeserialize_connack(&data->sessionPresent, &data->rc, c->readbuf, c->readbuf_size) == 1) + + if (MQTTDeserialize_connack(&data->sessionPresent, &data->rc, c->readbuf, c->readbuf_size) == 1) { rc = data->rc; - else + } else { rc = FAILURE; - } - else + } + } else { rc = FAILURE; + } exit: - if (rc == SUCCESS) - { + + if (rc == SUCCESS) { c->isconnected = 1; c->ping_outstanding = 0; } @@ -460,57 +545,54 @@ exit: } -int MQTTConnect(MQTTClient* c, MQTTPacket_connectData* options) +int MQTTConnect(MQTTClient *c, MQTTPacket_connectData *options) { MQTTConnackData data; return MQTTConnectWithResults(c, options, &data); } -int MQTTSetMessageHandler(MQTTClient* c, const char* topicFilter, messageHandler messageHandler) +int MQTTSetMessageHandler(MQTTClient *c, const char *topicFilter, messageHandler messageHandler) { int rc = FAILURE; int i = -1; /* first check for an existing matching slot */ - for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) - { - if (c->messageHandlers[i].topicFilter != NULL && strcmp(c->messageHandlers[i].topicFilter, topicFilter) == 0) - { - if (messageHandler == NULL) /* remove existing */ - { + for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) { + if (c->messageHandlers[i].topicFilter != NULL && strcmp(c->messageHandlers[i].topicFilter, topicFilter) == 0) { + if (messageHandler == NULL) { /* remove existing */ c->messageHandlers[i].topicFilter = NULL; c->messageHandlers[i].fp = NULL; } + rc = SUCCESS; /* return i when adding new subscription */ break; } } + /* if no existing, look for empty slot (unless we are removing) */ if (messageHandler != NULL) { - if (rc == FAILURE) - { - for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) - { - if (c->messageHandlers[i].topicFilter == NULL) - { + if (rc == FAILURE) { + for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) { + if (c->messageHandlers[i].topicFilter == NULL) { rc = SUCCESS; break; } } } - if (i < MAX_MESSAGE_HANDLERS) - { + + if (i < MAX_MESSAGE_HANDLERS) { c->messageHandlers[i].topicFilter = topicFilter; c->messageHandlers[i].fp = messageHandler; } } + return rc; } -int MQTTSubscribeWithResults(MQTTClient* c, const char* topicFilter, enum QoS qos, - messageHandler messageHandler, MQTTSubackData* data) +int MQTTSubscribeWithResults(MQTTClient *c, const char *topicFilter, enum QoS qos, + messageHandler messageHandler, MQTTSubackData *data) { int rc = FAILURE; Timer timer; @@ -521,36 +603,44 @@ int MQTTSubscribeWithResults(MQTTClient* c, const char* topicFilter, enum QoS qo #if defined(MQTT_TASK) MutexLock(&c->mutex); #endif - if (!c->isconnected) + + if (!c->isconnected) { goto exit; + } TimerInit(&timer); TimerCountdownMS(&timer, c->command_timeout_ms); - len = MQTTSerialize_subscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic, (int*)&qos); - if (len <= 0) - goto exit; - if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet - goto exit; // there was a problem + len = MQTTSerialize_subscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic, (int *)&qos); - if (waitfor(c, SUBACK, &timer) == SUBACK) // wait for suback - { + if (len <= 0) { + goto exit; + } + + if ((rc = sendPacket(c, len, &timer)) != SUCCESS) { // send the subscribe packet + goto exit; // there was a problem + } + + if (waitfor(c, SUBACK, &timer) == SUBACK) { // wait for suback int count = 0; unsigned short mypacketid; data->grantedQoS = QOS0; - if (MQTTDeserialize_suback(&mypacketid, 1, &count, (int*)&data->grantedQoS, c->readbuf, c->readbuf_size) == 1) - { - if (data->grantedQoS != 0x80) + + if (MQTTDeserialize_suback(&mypacketid, 1, &count, (int *)&data->grantedQoS, c->readbuf, c->readbuf_size) == 1) { + if (data->grantedQoS != 0x80) { rc = MQTTSetMessageHandler(c, topicFilter, messageHandler); + } } - } - else + } else { rc = FAILURE; + } exit: + if (rc == FAILURE) { MQTTCloseSession(c); } + #if defined(MQTT_TASK) MutexUnlock(&c->mutex); #endif @@ -558,15 +648,15 @@ exit: } -int MQTTSubscribe(MQTTClient* c, const char* topicFilter, enum QoS qos, - messageHandler messageHandler) +int MQTTSubscribe(MQTTClient *c, const char *topicFilter, enum QoS qos, + messageHandler messageHandler) { MQTTSubackData data; return MQTTSubscribeWithResults(c, topicFilter, qos, messageHandler, &data); } -int MQTTUnsubscribe(MQTTClient* c, const char* topicFilter) +int MQTTUnsubscribe(MQTTClient *c, const char *topicFilter) { int rc = FAILURE; Timer timer; @@ -577,32 +667,39 @@ int MQTTUnsubscribe(MQTTClient* c, const char* topicFilter) #if defined(MQTT_TASK) MutexLock(&c->mutex); #endif - if (!c->isconnected) + + if (!c->isconnected) { goto exit; + } TimerInit(&timer); TimerCountdownMS(&timer, c->command_timeout_ms); - if ((len = MQTTSerialize_unsubscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic)) <= 0) + if ((len = MQTTSerialize_unsubscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic)) <= 0) { goto exit; - if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet - goto exit; // there was a problem + } - if (waitfor(c, UNSUBACK, &timer) == UNSUBACK) - { + if ((rc = sendPacket(c, len, &timer)) != SUCCESS) { // send the subscribe packet + goto exit; // there was a problem + } + + if (waitfor(c, UNSUBACK, &timer) == UNSUBACK) { unsigned short mypacketid; // should be the same as the packetid above - if (MQTTDeserialize_unsuback(&mypacketid, c->readbuf, c->readbuf_size) == 1) - { + + if (MQTTDeserialize_unsuback(&mypacketid, c->readbuf, c->readbuf_size) == 1) { /* remove the subscription message handler associated with this topic, if there is one */ MQTTSetMessageHandler(c, topicFilter, NULL); } - } - else + } else { rc = FAILURE; + } exit: - if (rc == FAILURE) + + if (rc == FAILURE) { MQTTCloseSession(c); + } + #if defined(MQTT_TASK) MutexUnlock(&c->mutex); #endif @@ -610,7 +707,7 @@ exit: } -int MQTTPublish(MQTTClient* c, const char* topicName, MQTTMessage* message) +int MQTTPublish(MQTTClient *c, const char *topicName, MQTTMessage *message) { int rc = FAILURE; Timer timer; @@ -621,50 +718,59 @@ int MQTTPublish(MQTTClient* c, const char* topicName, MQTTMessage* message) #if defined(MQTT_TASK) MutexLock(&c->mutex); #endif - if (!c->isconnected) + + if (!c->isconnected) { goto exit; + } TimerInit(&timer); TimerCountdownMS(&timer, c->command_timeout_ms); - if (message->qos == QOS1 || message->qos == QOS2) + if (message->qos == QOS1 || message->qos == QOS2) { message->id = getNextPacketId(c); + } len = MQTTSerialize_publish(c->buf, c->buf_size, 0, message->qos, message->retained, message->id, - topic, (unsigned char*)message->payload, message->payloadlen); - if (len <= 0) - goto exit; - if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet - goto exit; // there was a problem + topic, (unsigned char *)message->payload, message->payloadlen); - if (message->qos == QOS1) - { - if (waitfor(c, PUBACK, &timer) == PUBACK) - { - unsigned short mypacketid; - unsigned char dup, type; - if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1) - rc = FAILURE; - } - else - rc = FAILURE; + if (len <= 0) { + goto exit; } - else if (message->qos == QOS2) - { - if (waitfor(c, PUBCOMP, &timer) == PUBCOMP) - { + + if ((rc = sendPacket(c, len, &timer)) != SUCCESS) { // send the subscribe packet + goto exit; // there was a problem + } + + if (message->qos == QOS1) { + if (waitfor(c, PUBACK, &timer) == PUBACK) { unsigned short mypacketid; unsigned char dup, type; - if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1) + + if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1) { rc = FAILURE; - } - else + } + } else { rc = FAILURE; + } + } else if (message->qos == QOS2) { + if (waitfor(c, PUBCOMP, &timer) == PUBCOMP) { + unsigned short mypacketid; + unsigned char dup, type; + + if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1) { + rc = FAILURE; + } + } else { + rc = FAILURE; + } } exit: - if (rc == FAILURE) + + if (rc == FAILURE) { MQTTCloseSession(c); + } + #if defined(MQTT_TASK) MutexUnlock(&c->mutex); #endif @@ -672,7 +778,7 @@ exit: } -int MQTTDisconnect(MQTTClient* c) +int MQTTDisconnect(MQTTClient *c) { int rc = FAILURE; Timer timer; // we might wait for incomplete incoming publishes to complete @@ -685,8 +791,11 @@ int MQTTDisconnect(MQTTClient* c) TimerCountdownMS(&timer, c->command_timeout_ms); len = MQTTSerialize_disconnect(c->buf, c->buf_size); - if (len > 0) - rc = sendPacket(c, len, &timer); // send the disconnect packet + + if (len > 0) { + rc = sendPacket(c, len, &timer); // send the disconnect packet + } + MQTTCloseSession(c); #if defined(MQTT_TASK) diff --git a/components/mqtt/paho/MQTTClient-C/src/MQTTClient.h b/components/mqtt/paho/MQTTClient-C/src/MQTTClient.h old mode 100755 new mode 100644 index d1287968..19253d4c --- a/components/mqtt/paho/MQTTClient-C/src/MQTTClient.h +++ b/components/mqtt/paho/MQTTClient-C/src/MQTTClient.h @@ -20,18 +20,18 @@ #define MQTT_CLIENT_H #if defined(__cplusplus) - extern "C" { +extern "C" { #endif #if defined(WIN32_DLL) || defined(WIN64_DLL) - #define DLLImport __declspec(dllimport) - #define DLLExport __declspec(dllexport) +#define DLLImport __declspec(dllimport) +#define DLLExport __declspec(dllexport) #elif defined(LINUX_SO) - #define DLLImport extern - #define DLLExport __attribute__ ((visibility ("default"))) +#define DLLImport extern +#define DLLExport __attribute__ ((visibility ("default"))) #else - #define DLLImport - #define DLLExport +#define DLLImport +#define DLLExport #endif #include "MQTTPacket.h" @@ -51,7 +51,7 @@ #define MAX_MESSAGE_HANDLERS 5 /* redefinable - how many subscriptions do you want? */ #endif -enum QoS { QOS0, QOS1, QOS2, SUBFAIL=0x80 }; +enum QoS { QOS0, QOS1, QOS2, SUBFAIL = 0x80 }; /* all failure return codes must be negative */ enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 }; @@ -61,20 +61,19 @@ enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 }; * typedef struct Network { - int (*mqttread)(Network*, unsigned char* read_buffer, int, int); - int (*mqttwrite)(Network*, unsigned char* send_buffer, int, int); + int (*mqttread)(Network*, unsigned char* read_buffer, int, int); + int (*mqttwrite)(Network*, unsigned char* send_buffer, int, int); } Network;*/ /* The Timer structure must be defined in the platform specific header, * and have the following functions to operate on it. */ -extern void TimerInit(Timer*); -extern char TimerIsExpired(Timer*); -extern void TimerCountdownMS(Timer*, unsigned int); -extern void TimerCountdown(Timer*, unsigned int); -extern int TimerLeftMS(Timer*); +extern void TimerInit(Timer *); +extern char TimerIsExpired(Timer *); +extern void TimerCountdownMS(Timer *, unsigned int); +extern void TimerCountdown(Timer *, unsigned int); +extern int TimerLeftMS(Timer *); -typedef struct MQTTMessage -{ +typedef struct MQTTMessage { enum QoS qos; unsigned char retained; unsigned char dup; @@ -83,48 +82,43 @@ typedef struct MQTTMessage size_t payloadlen; } MQTTMessage; -typedef struct MessageData -{ - MQTTMessage* message; - MQTTString* topicName; +typedef struct MessageData { + MQTTMessage *message; + MQTTString *topicName; } MessageData; -typedef struct MQTTConnackData -{ +typedef struct MQTTConnackData { unsigned char rc; unsigned char sessionPresent; } MQTTConnackData; -typedef struct MQTTSubackData -{ +typedef struct MQTTSubackData { enum QoS grantedQoS; } MQTTSubackData; -typedef void (*messageHandler)(MessageData*); +typedef void (*messageHandler)(MessageData *); -typedef struct MQTTClient -{ +typedef struct MQTTClient { unsigned int next_packetid, - command_timeout_ms; + command_timeout_ms; size_t buf_size, - readbuf_size; + readbuf_size; unsigned char *buf, - *readbuf; + *readbuf; unsigned int keepAliveInterval; char ping_outstanding; int isconnected; int cleansession; - struct MessageHandlers - { - const char* topicFilter; - void (*fp) (MessageData*); + struct MessageHandlers { + const char *topicFilter; + void (*fp)(MessageData *); } messageHandlers[MAX_MESSAGE_HANDLERS]; /* Message handlers are indexed by subscription topic */ - void (*defaultMessageHandler) (MessageData*); + void (*defaultMessageHandler)(MessageData *); - Network* ipstack; - Timer last_sent, last_received; + Network *ipstack; + Timer last_sent, last_received, ping_wait; #if defined(MQTT_TASK) Mutex mutex; Thread thread; @@ -141,23 +135,23 @@ typedef struct MQTTClient * @param command_timeout_ms * @param */ -DLLExport void MQTTClientInit(MQTTClient* client, Network* network, unsigned int command_timeout_ms, - unsigned char* sendbuf, size_t sendbuf_size, unsigned char* readbuf, size_t readbuf_size); +DLLExport bool MQTTClientInit(MQTTClient *client, Network *network, unsigned int command_timeout_ms, + unsigned char *sendbuf, size_t sendbuf_size, unsigned char *readbuf, size_t readbuf_size); /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack * The nework object must be connected to the network endpoint before calling this * @param options - connect options * @return success code */ -DLLExport int MQTTConnectWithResults(MQTTClient* client, MQTTPacket_connectData* options, - MQTTConnackData* data); +DLLExport int MQTTConnectWithResults(MQTTClient *client, MQTTPacket_connectData *options, + MQTTConnackData *data); /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack * The nework object must be connected to the network endpoint before calling this * @param options - connect options * @return success code */ -DLLExport int MQTTConnect(MQTTClient* client, MQTTPacket_connectData* options); +DLLExport int MQTTConnect(MQTTClient *client, MQTTPacket_connectData *options); /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs * @param client - the client object to use @@ -165,7 +159,7 @@ DLLExport int MQTTConnect(MQTTClient* client, MQTTPacket_connectData* options); * @param message - the message to send * @return success code */ -DLLExport int MQTTPublish(MQTTClient* client, const char*, MQTTMessage*); +DLLExport int MQTTPublish(MQTTClient *client, const char *, MQTTMessage *); /** MQTT SetMessageHandler - set or remove a per topic message handler * @param client - the client object to use @@ -173,7 +167,7 @@ DLLExport int MQTTPublish(MQTTClient* client, const char*, MQTTMessage*); * @param messageHandler - pointer to the message handler function or NULL to remove * @return success code */ -DLLExport int MQTTSetMessageHandler(MQTTClient* c, const char* topicFilter, messageHandler messageHandler); +DLLExport int MQTTSetMessageHandler(MQTTClient *c, const char *topicFilter, messageHandler messageHandler); /** MQTT Subscribe - send an MQTT subscribe packet and wait for suback before returning. * @param client - the client object to use @@ -181,7 +175,7 @@ DLLExport int MQTTSetMessageHandler(MQTTClient* c, const char* topicFilter, mess * @param message - the message to send * @return success code */ -DLLExport int MQTTSubscribe(MQTTClient* client, const char* topicFilter, enum QoS, messageHandler); +DLLExport int MQTTSubscribe(MQTTClient *client, const char *topicFilter, enum QoS, messageHandler); /** MQTT Subscribe - send an MQTT subscribe packet and wait for suback before returning. * @param client - the client object to use @@ -190,35 +184,35 @@ DLLExport int MQTTSubscribe(MQTTClient* client, const char* topicFilter, enum Qo * @param data - suback granted QoS returned * @return success code */ -DLLExport int MQTTSubscribeWithResults(MQTTClient* client, const char* topicFilter, enum QoS, messageHandler, MQTTSubackData* data); +DLLExport int MQTTSubscribeWithResults(MQTTClient *client, const char *topicFilter, enum QoS, messageHandler, MQTTSubackData *data); /** MQTT Subscribe - send an MQTT unsubscribe packet and wait for unsuback before returning. * @param client - the client object to use * @param topicFilter - the topic filter to unsubscribe from * @return success code */ -DLLExport int MQTTUnsubscribe(MQTTClient* client, const char* topicFilter); +DLLExport int MQTTUnsubscribe(MQTTClient *client, const char *topicFilter); /** MQTT Disconnect - send an MQTT disconnect packet and close the connection * @param client - the client object to use * @return success code */ -DLLExport int MQTTDisconnect(MQTTClient* client); +DLLExport int MQTTDisconnect(MQTTClient *client); /** MQTT Yield - MQTT background * @param client - the client object to use * @param time - the time, in milliseconds, to yield for * @return success code */ -DLLExport int MQTTYield(MQTTClient* client, int time); +DLLExport int MQTTYield(MQTTClient *client, int time); /** MQTT isConnected * @param client - the client object to use * @return truth value indicating whether the client is connected to the server */ -static inline DLLExport int MQTTIsConnected(MQTTClient* client) +static inline DLLExport int MQTTIsConnected(MQTTClient *client) { - return client->isconnected; + return client->isconnected; } #if defined(MQTT_TASK) @@ -226,11 +220,11 @@ static inline DLLExport int MQTTIsConnected(MQTTClient* client) * @param client - the client object to use * @return success code */ -DLLExport int MQTTStartTask(MQTTClient* client); +DLLExport int MQTTStartTask(MQTTClient *client); #endif #if defined(__cplusplus) - } +} #endif #endif