feat: refactor mqtt feature

1. add all of mqtt configuration to make-menuconfig
2. fix taskYIELD caused low thouhtput
3. fix block at socket select
4. fix mqtt ping timeout feature
5. fix MQTTPublish/MQTTConnect return -1
6. format code
This commit is contained in:
Chen Wu
2018-12-14 16:26:33 +08:00
parent c10e6d3906
commit ddb9e23b6e
5 changed files with 622 additions and 379 deletions

136
components/mqtt/Kconfig Normal file
View File

@ -0,0 +1,136 @@
menu "MQTT(Paho)"
choice MQTT_VERSION
prompt "MQTT version"
default V3_1
help
Current supported MQTT version.
config V3_1
bool "V3.1"
config V3_1_1
bool "V3.1.1"
endchoice
config DEFAULT_MQTT_VERSION
int
default 3 if V3_1
default 4 if V3_1_1
config MQTT_CLIENT_ID
string "MQTT client ID"
default "espressif_sample"
help
MQTT client ID for MQTT broker to identify ESP device.
config MQTT_KEEP_ALIVE
int "MQTT keep-alive(seconds)"
default 30
help
MQTT keep alive interval, Recommended value: 30s - 60s.
The last MQTT packet timestamp will be recorded,
a PING request will be sent if (current_timestamp - last_mqtt_packet_timestamp) > MQTT_KEEP_ALIVE.
config MQTT_USERNAME
string "MQTT username"
default "espressif"
help
Username used for logging to MQTT broker.
Generally, you should use a valid MQTT_USERNAME if MQTT broker does not allow an anonymous login.
config MQTT_PASSWORD
string "MQTT password"
default "admin"
help
Password used for logging to MQTT broker.
Generally, you should use a valid MQTT_PASSWORD if MQTT broker does not allow an anonymous login.
choice MQTT_SESSION
prompt "MQTT Session"
default CLEAN_SESSION
help
Clean session to start a new session.
If clean-seesion is set, it will discard any previous session and start a new one.
If keep-session is set, it will store session state and the communication can resume.
config CLEAN_SESSION
bool "Clean Session"
config KEEP_SESSION
bool "Keep Session"
endchoice
config DEFAULT_MQTT_SESSION
int
default 0 if KEEP_SESSION
default 1 if CLEAN_SESSION
choice MQTT_SECURITY
prompt "MQTT over TCP/SSL/TLS feature"
default NO_TLS
help
MQTT over TCP/SSL/TLS.
MQTT_SECURITY=0: MQTT over TCP
MQTT_SECURITY=1: MQTT over TLS with no verify
MQTT_SECURITY=2: MQTT over TLS with verify peer
MQTT_SECURITY=3: MQTT over TLS with verify client
config NO_TLS
bool "TCP"
config TLS_VERIFY_NONE
bool "TLS Verify None"
config TLS_VERIFY_PEER
bool "TLS Verify Peer"
config TLS_VERIFY_CLIENT
bool "TLS Verify Client"
endchoice
config DEFAULT_MQTT_SECURITY
int
default 0 if NO_TLS
default 1 if TLS_VERIFY_NONE
default 2 if TLS_VERIFY_PEER
default 3 if TLS_VERIFY_CLIENT
config MQTT_SEND_BUFFER
int "MQTT send buffer"
default 2048
help
Recommended value: 1460 - 2048.
Buffer used for sending MQTT messages, including MQTT header, MQTT topic, payload and etc.
config MQTT_RECV_BUFFER
int "MQTT recv buffer"
default 2048
help
Recommended value: 1460 - 2048.
Buffer used for receiving MQTT messages, including MQTT header, MQTT topic, payload and etc.
config MQTT_SEND_CYCLE
int "MQTT send cycle(ms)"
default 30000
help
Recommended value: 30000 - 60000.
MQTT send interval in every cycle.
A MQTT packet should be sent out in MQTT_SEND_CYCLE,
will block for MQTT_SEND_CYCLE if weak network, and return timeout.
config MQTT_RECV_CYCLE
int "MQTT recv cycle(ms)"
default 0
help
Recommended value: 0ms - 500ms.
MQTT receive interval in every cycle.
a MQTT packet should be received in MQTT_RECV_CYCLE,
will block for MQTT_RECV_CYCLE if weak network , and return timeout.
config MQTT_PING_TIMEOUT
int "MQTT ping timeout(ms)"
default 3000
help
Recommended value: 3000ms - 10000ms.
MQTT ping timeout.
When MQTT_KEEP_ALIVE expired, it will start sending ping request.
If the ESP device does not receive any ping response within MQTT_PING_TIMEOUT,
it will terminate the MQTT connection.
endmenu

View File

@ -14,7 +14,6 @@
* Allan Stockdill-Mander - initial API and implementation and/or initial documentation * Allan Stockdill-Mander - initial API and implementation and/or initial documentation
* Ian Craggs - convert to FreeRTOS * Ian Craggs - convert to FreeRTOS
*******************************************************************************/ *******************************************************************************/
#include <string.h> #include <string.h>
#include <netdb.h> #include <netdb.h>
@ -22,7 +21,7 @@
#include "MQTTFreeRTOS.h" #include "MQTTFreeRTOS.h"
int ThreadStart(Thread* thread, void (*fn)(void*), void* arg) int ThreadStart(Thread *thread, void (*fn)(void *), void *arg)
{ {
int rc = 0; int rc = 0;
uint16_t usTaskStackSize = (configMINIMAL_STACK_SIZE * 5); uint16_t usTaskStackSize = (configMINIMAL_STACK_SIZE * 5);
@ -39,63 +38,60 @@ int ThreadStart(Thread* thread, void (*fn)(void*), void* arg)
} }
void MutexInit(Mutex* mutex) void MutexInit(Mutex *mutex)
{ {
mutex->sem = xSemaphoreCreateMutex(); mutex->sem = xSemaphoreCreateMutex();
} }
int MutexLock(Mutex* mutex) int MutexLock(Mutex *mutex)
{ {
return xSemaphoreTake(mutex->sem, portMAX_DELAY); return xSemaphoreTake(mutex->sem, portMAX_DELAY);
} }
int MutexUnlock(Mutex* mutex) int MutexUnlock(Mutex *mutex)
{ {
int x = xSemaphoreGive(mutex->sem); return xSemaphoreGive(mutex->sem);
taskYIELD();
return x;
} }
void TimerCountdownMS(Timer* timer, unsigned int timeout_ms) void TimerCountdownMS(Timer *timer, unsigned int timeout_ms)
{ {
timer->xTicksToWait = timeout_ms / portTICK_PERIOD_MS; /* convert milliseconds to ticks */ timer->xTicksToWait = timeout_ms / portTICK_PERIOD_MS; /* convert milliseconds to ticks */
vTaskSetTimeOutState(&timer->xTimeOut); /* Record the time at which this function was entered. */ vTaskSetTimeOutState(&timer->xTimeOut); /* Record the time at which this function was entered. */
} }
void TimerCountdown(Timer* timer, unsigned int timeout) void TimerCountdown(Timer *timer, unsigned int timeout)
{ {
TimerCountdownMS(timer, timeout * 1000); TimerCountdownMS(timer, timeout * 1000);
} }
int TimerLeftMS(Timer* timer) int TimerLeftMS(Timer *timer)
{ {
xTaskCheckForTimeOut(&timer->xTimeOut, &timer->xTicksToWait); /* updates xTicksToWait to the number left */ xTaskCheckForTimeOut(&timer->xTimeOut, &timer->xTicksToWait); /* updates xTicksToWait to the number left */
return timer->xTicksToWait * portTICK_PERIOD_MS; return (timer->xTicksToWait < 0) ? 0 : (timer->xTicksToWait * portTICK_PERIOD_MS);
} }
char TimerIsExpired(Timer* timer) char TimerIsExpired(Timer *timer)
{ {
return xTaskCheckForTimeOut(&timer->xTimeOut, &timer->xTicksToWait) == pdTRUE; return xTaskCheckForTimeOut(&timer->xTimeOut, &timer->xTicksToWait) == pdTRUE;
} }
void TimerInit(Timer* timer) void TimerInit(Timer *timer)
{ {
timer->xTicksToWait = 0; timer->xTicksToWait = 0;
memset(&timer->xTimeOut, '\0', sizeof(timer->xTimeOut)); memset(&timer->xTimeOut, '\0', sizeof(timer->xTimeOut));
} }
static int esp_read(Network* n, unsigned char* buffer, unsigned int len, unsigned int timeout_ms) static int esp_read(Network *n, unsigned char *buffer, unsigned int len, unsigned int timeout_ms)
{ {
portTickType xTicksToWait = timeout_ms / portTICK_RATE_MS; /* convert milliseconds to ticks */ portTickType xTicksToWait = timeout_ms / portTICK_RATE_MS; /* convert milliseconds to ticks */
xTimeOutType xTimeOut; xTimeOutType xTimeOut;
int recvLen = 0; int recvLen = 0, rc = 0, ret = 0;
int rc = 0;
struct timeval timeout; struct timeval timeout;
fd_set fdset; fd_set fdset;
@ -108,7 +104,14 @@ static int esp_read(Network* n, unsigned char* buffer, unsigned int len, unsigne
vTaskSetTimeOutState(&xTimeOut); /* Record the time at which this function was entered. */ vTaskSetTimeOutState(&xTimeOut); /* Record the time at which this function was entered. */
if (select(n->my_socket + 1, &fdset, NULL, NULL, &timeout) > 0) { ret = select(n->my_socket + 1, &fdset, NULL, NULL, &timeout);
if (ret <= 0) {
// ret == 0: timeout
// ret < 0: socket err
return ret;
}
if (FD_ISSET(n->my_socket, &fdset)) { if (FD_ISSET(n->my_socket, &fdset)) {
do { do {
rc = recv(n->my_socket, buffer + recvLen, len - recvLen, MSG_DONTWAIT); rc = recv(n->my_socket, buffer + recvLen, len - recvLen, MSG_DONTWAIT);
@ -121,19 +124,16 @@ static int esp_read(Network* n, unsigned char* buffer, unsigned int len, unsigne
} }
} while (recvLen < len && xTaskCheckForTimeOut(&xTimeOut, &xTicksToWait) == pdFALSE); } while (recvLen < len && xTaskCheckForTimeOut(&xTimeOut, &xTicksToWait) == pdFALSE);
} }
}
return recvLen; return recvLen;
} }
static int esp_write(Network* n, unsigned char* buffer, unsigned int len, unsigned int timeout_ms) static int esp_write(Network *n, unsigned char *buffer, unsigned int len, unsigned int timeout_ms)
{ {
portTickType xTicksToWait = timeout_ms / portTICK_RATE_MS; /* convert milliseconds to ticks */ portTickType xTicksToWait = timeout_ms / portTICK_RATE_MS; /* convert milliseconds to ticks */
xTimeOutType xTimeOut; xTimeOutType xTimeOut;
int sentLen = 0; int sentLen = 0, rc = 0, ret = 0;
int rc = 0;
int readysock;
struct timeval timeout; struct timeval timeout;
fd_set fdset; fd_set fdset;
@ -143,12 +143,15 @@ static int esp_write(Network* n, unsigned char* buffer, unsigned int len, unsign
timeout.tv_sec = 0; timeout.tv_sec = 0;
timeout.tv_usec = timeout_ms * 1000; timeout.tv_usec = timeout_ms * 1000;
vTaskSetTimeOutState(&xTimeOut); /* Record the time at which this function was entered. */ vTaskSetTimeOutState(&xTimeOut); /* Record the time at which this function was entered. */
do { ret = select(n->my_socket + 1, NULL, &fdset, NULL, &timeout);
readysock = select(n->my_socket + 1, NULL, &fdset, NULL, &timeout);
} while (readysock <= 0); if (ret <= 0) {
// ret == 0: timeout
// ret < 0: socket err
return ret;
}
if (FD_ISSET(n->my_socket, &fdset)) { if (FD_ISSET(n->my_socket, &fdset)) {
do { do {
@ -167,13 +170,13 @@ static int esp_write(Network* n, unsigned char* buffer, unsigned int len, unsign
} }
static void esp_disconnect(Network* n) static void esp_disconnect(Network *n)
{ {
close(n->my_socket); close(n->my_socket);
} }
void NetworkInit(Network* n) void NetworkInit(Network *n)
{ {
n->my_socket = 0; n->my_socket = 0;
n->mqttread = esp_read; n->mqttread = esp_read;
@ -182,25 +185,25 @@ void NetworkInit(Network* n)
} }
int NetworkConnect(Network* n, char* addr, int port) int NetworkConnect(Network *n, char *addr, int port)
{ {
struct sockaddr_in sAddr; struct sockaddr_in sAddr;
int retVal = -1; int retVal = -1;
struct hostent* ipAddress; struct hostent *ipAddress;
if ((ipAddress = gethostbyname(addr)) == 0) { if ((ipAddress = gethostbyname(addr)) == 0) {
goto exit; goto exit;
} }
sAddr.sin_family = AF_INET; sAddr.sin_family = AF_INET;
sAddr.sin_addr.s_addr = ((struct in_addr*)(ipAddress->h_addr))->s_addr; sAddr.sin_addr.s_addr = ((struct in_addr *)(ipAddress->h_addr))->s_addr;
sAddr.sin_port = htons(port); sAddr.sin_port = htons(port);
if ((n->my_socket = socket(AF_INET, SOCK_STREAM, 0)) < 0) { if ((n->my_socket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
goto exit; goto exit;
} }
if ((retVal = connect(n->my_socket, (struct sockaddr*)&sAddr, sizeof(sAddr))) < 0) { if ((retVal = connect(n->my_socket, (struct sockaddr *)&sAddr, sizeof(sAddr))) < 0) {
close(n->my_socket); close(n->my_socket);
goto exit; goto exit;
} }
@ -210,13 +213,13 @@ exit:
} }
#ifdef CONFIG_SSL_USING_MBEDTLS #ifdef CONFIG_SSL_USING_MBEDTLS
static int esp_ssl_read(Network* n, unsigned char* buffer, unsigned int len, unsigned int timeout_ms) static int esp_ssl_read(Network *n, unsigned char *buffer, unsigned int len, unsigned int timeout_ms)
{ {
portTickType xTicksToWait = timeout_ms / portTICK_RATE_MS; /* convert milliseconds to ticks */ portTickType xTicksToWait = timeout_ms / portTICK_RATE_MS; /* convert milliseconds to ticks */
xTimeOutType xTimeOut; xTimeOutType xTimeOut;
int recvLen = 0; int recvLen = 0;
int rc = 0; int rc = 0;
static unsigned char* read_buffer; static unsigned char *read_buffer;
struct timeval timeout; struct timeval timeout;
fd_set readset; fd_set readset;
@ -297,7 +300,7 @@ static int esp_ssl_read(Network* n, unsigned char* buffer, unsigned int len, uns
} }
static int esp_ssl_write(Network* n, unsigned char* buffer, unsigned int len, unsigned int timeout_ms) static int esp_ssl_write(Network *n, unsigned char *buffer, unsigned int len, unsigned int timeout_ms)
{ {
portTickType xTicksToWait = timeout_ms / portTICK_RATE_MS; /* convert milliseconds to ticks */ portTickType xTicksToWait = timeout_ms / portTICK_RATE_MS; /* convert milliseconds to ticks */
xTimeOutType xTimeOut; xTimeOutType xTimeOut;
@ -342,7 +345,7 @@ static int esp_ssl_write(Network* n, unsigned char* buffer, unsigned int len, un
} }
static void esp_ssl_disconnect(Network* n) static void esp_ssl_disconnect(Network *n)
{ {
close(n->my_socket); close(n->my_socket);
SSL_free(n->ssl); SSL_free(n->ssl);
@ -351,7 +354,7 @@ static void esp_ssl_disconnect(Network* n)
} }
void NetworkInitSSL(Network* n) void NetworkInitSSL(Network *n)
{ {
n->my_socket = 0; n->my_socket = 0;
n->mqttread = esp_ssl_read; n->mqttread = esp_ssl_read;
@ -363,11 +366,11 @@ void NetworkInitSSL(Network* n)
} }
int NetworkConnectSSL(Network* n, char* addr, int port, ssl_ca_crt_key_t* ssl_cck, const SSL_METHOD* method, int verify_mode, size_t frag_len) int NetworkConnectSSL(Network *n, char *addr, int port, ssl_ca_crt_key_t *ssl_cck, const SSL_METHOD *method, int verify_mode, size_t frag_len)
{ {
struct sockaddr_in sAddr; struct sockaddr_in sAddr;
int retVal = -1; int retVal = -1;
struct hostent* ipAddress; struct hostent *ipAddress;
if ((ipAddress = gethostbyname(addr)) == 0) { if ((ipAddress = gethostbyname(addr)) == 0) {
goto exit; goto exit;
@ -408,14 +411,14 @@ int NetworkConnectSSL(Network* n, char* addr, int port, ssl_ca_crt_key_t* ssl_cc
} }
sAddr.sin_family = AF_INET; sAddr.sin_family = AF_INET;
sAddr.sin_addr.s_addr = ((struct in_addr*)(ipAddress->h_addr))->s_addr; sAddr.sin_addr.s_addr = ((struct in_addr *)(ipAddress->h_addr))->s_addr;
sAddr.sin_port = htons(port); sAddr.sin_port = htons(port);
if ((n->my_socket = socket(AF_INET, SOCK_STREAM, 0)) < 0) { if ((n->my_socket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
goto exit1; goto exit1;
} }
if ((retVal = connect(n->my_socket, (struct sockaddr*)&sAddr, sizeof(sAddr))) < 0) { if ((retVal = connect(n->my_socket, (struct sockaddr *)&sAddr, sizeof(sAddr))) < 0) {
goto exit2; goto exit2;
} }

View File

@ -14,7 +14,7 @@
* Allan Stockdill-Mander - initial API and implementation and/or initial documentation * Allan Stockdill-Mander - initial API and implementation and/or initial documentation
*******************************************************************************/ *******************************************************************************/
#if !defined(MQTTFreeRTOS_H) #ifndef MQTTFreeRTOS_H
#define MQTTFreeRTOS_H #define MQTTFreeRTOS_H
#include "freertos/FreeRTOS.h" #include "freertos/FreeRTOS.h"
@ -25,49 +25,50 @@
#include "openssl/ssl.h" #include "openssl/ssl.h"
#endif #endif
typedef struct Timer #define OVER_TCP 0 // 0: MQTT over TCP
{ #define TLS_VERIFY_NONE 1 // 1: enable SSL/TLS, but there is no a certificate verify
#define TLS_VERIFY_PEER 2 // 2: enable SSL/TLS, and verify the MQTT broker certificate
#define TLS_VERIFY_CLIENT 3 // 3: enable SSL/TLS, and verify the MQTT broker certificate, and enable certificate for MQTT broker
typedef struct Timer {
TickType_t xTicksToWait; TickType_t xTicksToWait;
TimeOut_t xTimeOut; TimeOut_t xTimeOut;
} Timer; } Timer;
typedef struct Network Network; typedef struct Network Network;
struct Network struct Network {
{
int my_socket; int my_socket;
int (*mqttread)(Network*, unsigned char*, unsigned int, unsigned int); int (*mqttread)(Network *, unsigned char *, unsigned int, unsigned int);
int (*mqttwrite)(Network*, unsigned char*, unsigned int, unsigned int); int (*mqttwrite)(Network *, unsigned char *, unsigned int, unsigned int);
void (*disconnect)(Network*); void (*disconnect)(Network *);
int read_count; int read_count;
#ifdef CONFIG_SSL_USING_MBEDTLS #ifdef CONFIG_SSL_USING_MBEDTLS
SSL_CTX* ctx; SSL_CTX *ctx;
SSL* ssl; SSL *ssl;
#endif #endif
}; };
void TimerInit(Timer*); void TimerInit(Timer *);
char TimerIsExpired(Timer*); char TimerIsExpired(Timer *);
void TimerCountdownMS(Timer*, unsigned int); void TimerCountdownMS(Timer *, unsigned int);
void TimerCountdown(Timer*, unsigned int); void TimerCountdown(Timer *, unsigned int);
int TimerLeftMS(Timer*); int TimerLeftMS(Timer *);
typedef struct Mutex typedef struct Mutex {
{
SemaphoreHandle_t sem; SemaphoreHandle_t sem;
} Mutex; } Mutex;
void MutexInit(Mutex*); void MutexInit(Mutex *);
int MutexLock(Mutex*); int MutexLock(Mutex *);
int MutexUnlock(Mutex*); int MutexUnlock(Mutex *);
typedef struct Thread typedef struct Thread {
{
TaskHandle_t task; TaskHandle_t task;
} Thread; } Thread;
int ThreadStart(Thread*, void (*fn)(void*), void* arg); int ThreadStart(Thread *, void (*fn)(void *), void *arg);
/** /**
* @brief Initialize the network structure * @brief Initialize the network structure
@ -76,7 +77,7 @@ int ThreadStart(Thread*, void (*fn)(void*), void* arg);
* *
* @return void * @return void
*/ */
void NetworkInit(Network*); void NetworkInit(Network *);
/** /**
* @brief connect with mqtt broker * @brief connect with mqtt broker
@ -87,15 +88,15 @@ void NetworkInit(Network*);
* *
* @return connect status * @return connect status
*/ */
int NetworkConnect(Network* n, char* addr, int port); int NetworkConnect(Network *n, char *addr, int port);
#ifdef CONFIG_SSL_USING_MBEDTLS #ifdef CONFIG_SSL_USING_MBEDTLS
typedef struct ssl_ca_crt_key { typedef struct ssl_ca_crt_key {
unsigned char* cacrt; unsigned char *cacrt;
unsigned int cacrt_len; unsigned int cacrt_len;
unsigned char* cert; unsigned char *cert;
unsigned int cert_len; unsigned int cert_len;
unsigned char* key; unsigned char *key;
unsigned int key_len; unsigned int key_len;
} ssl_ca_crt_key_t; } ssl_ca_crt_key_t;
@ -106,7 +107,7 @@ typedef struct ssl_ca_crt_key {
* *
* @return void * @return void
*/ */
void NetworkInitSSL(Network* n); void NetworkInitSSL(Network *n);
/** /**
* @brief Use SSL to connect with mqtt broker * @brief Use SSL to connect with mqtt broker
@ -121,7 +122,7 @@ void NetworkInitSSL(Network* n);
* *
* @return connect status * @return connect status
*/ */
int NetworkConnectSSL(Network* n, char* addr, int port, ssl_ca_crt_key_t* ssl_cck, const SSL_METHOD* method, int verify_mode, unsigned int frag_len); int NetworkConnectSSL(Network *n, char *addr, int port, ssl_ca_crt_key_t *ssl_cck, const SSL_METHOD *method, int verify_mode, unsigned int frag_len);
/*int NetworkConnectTLS(Network*, char*, int, SlSockSecureFiles_t*, unsigned char, unsigned int, char);*/ /*int NetworkConnectTLS(Network*, char*, int, SlSockSecureFiles_t*, unsigned char, unsigned int, char);*/

475
components/mqtt/paho/MQTTClient-C/src/MQTTClient.c Executable file → Normal file
View File

@ -15,58 +15,93 @@
* Ian Craggs - fix for #96 - check rem_len in readPacket * Ian Craggs - fix for #96 - check rem_len in readPacket
* Ian Craggs - add ability to set message handler separately #6 * Ian Craggs - add ability to set message handler separately #6
*******************************************************************************/ *******************************************************************************/
#include "MQTTClient.h"
#include <stdio.h> #include <stdio.h>
#include <string.h> #include <string.h>
#include "esp_log.h"
#include "MQTTClient.h"
static void NewMessageData(MessageData* md, MQTTString* aTopicName, MQTTMessage* aMessage) { static const char *TAG = "mc";
static void NewMessageData(MessageData *md, MQTTString *aTopicName, MQTTMessage *aMessage)
{
md->topicName = aTopicName; md->topicName = aTopicName;
md->message = aMessage; md->message = aMessage;
} }
static int getNextPacketId(MQTTClient *c) { static int getNextPacketId(MQTTClient *c)
{
return c->next_packetid = (c->next_packetid == MAX_PACKET_ID) ? 1 : c->next_packetid + 1; return c->next_packetid = (c->next_packetid == MAX_PACKET_ID) ? 1 : c->next_packetid + 1;
} }
static int sendPacket(MQTTClient* c, int length, Timer* timer) static int sendPacket(MQTTClient *c, int length, Timer *timer)
{ {
int rc = FAILURE, int rc = FAILURE,
sent = 0; sent = 0;
while (sent < length && !TimerIsExpired(timer)) while (sent < length && !TimerIsExpired(timer)) {
{
rc = c->ipstack->mqttwrite(c->ipstack, &c->buf[sent], length, TimerLeftMS(timer)); rc = c->ipstack->mqttwrite(c->ipstack, &c->buf[sent], length, TimerLeftMS(timer));
if (rc < 0) // there was an error writing the data
if (rc < 0) { // there was an error writing the data
break; break;
}
sent += rc; sent += rc;
} }
if (sent == length)
{ if (sent == length) {
TimerCountdown(&c->last_sent, c->keepAliveInterval); // record the fact that we have successfully sent the packet TimerCountdown(&c->last_sent, c->keepAliveInterval); // record the fact that we have successfully sent the packet
rc = SUCCESS; rc = SUCCESS;
} } else {
else
rc = FAILURE; rc = FAILURE;
}
return rc; return rc;
} }
bool MQTTClientInit(MQTTClient *c, Network *network, unsigned int command_timeout_ms,
void MQTTClientInit(MQTTClient* c, Network* network, unsigned int command_timeout_ms, unsigned char *sendbuf, size_t sendbuf_size, unsigned char *readbuf, size_t readbuf_size)
unsigned char* sendbuf, size_t sendbuf_size, unsigned char* readbuf, size_t readbuf_size)
{ {
int i; int i;
c->ipstack = network; c->ipstack = network;
for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) {
c->messageHandlers[i].topicFilter = 0; c->messageHandlers[i].topicFilter = 0;
}
if (command_timeout_ms != 0) {
c->command_timeout_ms = command_timeout_ms; c->command_timeout_ms = command_timeout_ms;
} else {
c->command_timeout_ms = CONFIG_MQTT_SEND_CYCLE;
}
if (sendbuf) {
c->buf = sendbuf; c->buf = sendbuf;
c->buf_size = sendbuf_size; c->buf_size = sendbuf_size;
} else {
c->buf = (unsigned char *)malloc(CONFIG_MQTT_SEND_BUFFER);
if (c->buf) {
c->buf_size = CONFIG_MQTT_SEND_BUFFER;
} else {
return false;
}
}
if (readbuf) {
c->readbuf = readbuf; c->readbuf = readbuf;
c->readbuf_size = readbuf_size; c->readbuf_size = readbuf_size;
} else {
c->readbuf = (unsigned char *)malloc(CONFIG_MQTT_RECV_BUFFER);
if (c->readbuf) {
c->readbuf_size = CONFIG_MQTT_RECV_BUFFER;
} else {
return false;
}
}
c->isconnected = 0; c->isconnected = 0;
c->cleansession = 0; c->cleansession = 0;
c->ping_outstanding = 0; c->ping_outstanding = 0;
@ -74,13 +109,15 @@ void MQTTClientInit(MQTTClient* c, Network* network, unsigned int command_timeou
c->next_packetid = 1; c->next_packetid = 1;
TimerInit(&c->last_sent); TimerInit(&c->last_sent);
TimerInit(&c->last_received); TimerInit(&c->last_received);
TimerInit(&c->ping_wait);
#if defined(MQTT_TASK) #if defined(MQTT_TASK)
MutexInit(&c->mutex); MutexInit(&c->mutex);
#endif #endif
return true;
} }
static int decodePacket(MQTTClient* c, int* value, int timeout) static int decodePacket(MQTTClient *c, int *value, int timeout)
{ {
unsigned char i; unsigned char i;
int multiplier = 1; int multiplier = 1;
@ -88,27 +125,31 @@ static int decodePacket(MQTTClient* c, int* value, int timeout)
const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4; const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
*value = 0; *value = 0;
do
{ do {
int rc = MQTTPACKET_READ_ERROR; int rc = MQTTPACKET_READ_ERROR;
if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) {
{
rc = MQTTPACKET_READ_ERROR; /* bad data */ rc = MQTTPACKET_READ_ERROR; /* bad data */
goto exit; goto exit;
} }
rc = c->ipstack->mqttread(c->ipstack, &i, 1, timeout); rc = c->ipstack->mqttread(c->ipstack, &i, 1, timeout);
if (rc != 1)
if (rc != 1) {
goto exit; goto exit;
}
*value += (i & 127) * multiplier; *value += (i & 127) * multiplier;
multiplier *= 128; multiplier *= 128;
} while ((i & 128) != 0); } while ((i & 128) != 0);
exit: exit:
return len; return len;
} }
static int readPacket(MQTTClient* c, Timer* timer) static int readPacket(MQTTClient *c, Timer *timer)
{ {
MQTTHeader header = {0}; MQTTHeader header = {0};
int len = 0; int len = 0;
@ -116,16 +157,17 @@ static int readPacket(MQTTClient* c, Timer* timer)
/* 1. read the header byte. This has the packet type in it */ /* 1. read the header byte. This has the packet type in it */
int rc = c->ipstack->mqttread(c->ipstack, c->readbuf, 1, TimerLeftMS(timer)); int rc = c->ipstack->mqttread(c->ipstack, c->readbuf, 1, TimerLeftMS(timer));
if (rc != 1)
if (rc != 1) {
goto exit; goto exit;
}
len = 1; len = 1;
/* 2. read the remaining length. This is variable in itself */ /* 2. read the remaining length. This is variable in itself */
decodePacket(c, &rem_len, TimerLeftMS(timer)); decodePacket(c, &rem_len, TimerLeftMS(timer));
len += MQTTPacket_encode(c->readbuf + 1, rem_len); /* put the original remaining length back into the buffer */ len += MQTTPacket_encode(c->readbuf + 1, rem_len); /* put the original remaining length back into the buffer */
if (rem_len > (c->readbuf_size - len)) if (rem_len > (c->readbuf_size - len)) {
{
rc = BUFFER_OVERFLOW; rc = BUFFER_OVERFLOW;
goto exit; goto exit;
} }
@ -138,8 +180,11 @@ static int readPacket(MQTTClient* c, Timer* timer)
header.byte = c->readbuf[0]; header.byte = c->readbuf[0];
rc = header.bits.type; rc = header.bits.type;
if (c->keepAliveInterval > 0)
if (c->keepAliveInterval > 0) {
TimerCountdown(&c->last_received, c->keepAliveInterval); // record the fact that we have successfully received a packet TimerCountdown(&c->last_received, c->keepAliveInterval); // record the fact that we have successfully received a packet
}
exit: exit:
return rc; return rc;
} }
@ -148,26 +193,32 @@ exit:
// assume topic filter and name is in correct format // assume topic filter and name is in correct format
// # can only be at end // # can only be at end
// + and # can only be next to separator // + and # can only be next to separator
static char isTopicMatched(char* topicFilter, MQTTString* topicName) static char isTopicMatched(char *topicFilter, MQTTString *topicName)
{ {
char* curf = topicFilter; char *curf = topicFilter;
char* curn = topicName->lenstring.data; char *curn = topicName->lenstring.data;
char* curn_end = curn + topicName->lenstring.len; char *curn_end = curn + topicName->lenstring.len;
while (*curf && curn < curn_end) while (*curf && curn < curn_end) {
{ if (*curn == '/' && *curf != '/') {
if (*curn == '/' && *curf != '/')
break; break;
if (*curf != '+' && *curf != '#' && *curf != *curn) }
if (*curf != '+' && *curf != '#' && *curf != *curn) {
break; break;
if (*curf == '+') }
{ // skip until we meet the next separator, or end of string
char* nextpos = curn + 1; if (*curf == '+') {
while (nextpos < curn_end && *nextpos != '/') // skip until we meet the next separator, or end of string
char *nextpos = curn + 1;
while (nextpos < curn_end && *nextpos != '/') {
nextpos = ++curn + 1; nextpos = ++curn + 1;
} }
else if (*curf == '#') } else if (*curf == '#') {
curn = curn_end - 1; // skip until end of string curn = curn_end - 1; // skip until end of string
}
curf++; curf++;
curn++; curn++;
}; };
@ -176,19 +227,16 @@ static char isTopicMatched(char* topicFilter, MQTTString* topicName)
} }
int deliverMessage(MQTTClient* c, MQTTString* topicName, MQTTMessage* message) int deliverMessage(MQTTClient *c, MQTTString *topicName, MQTTMessage *message)
{ {
int i; int i;
int rc = FAILURE; int rc = FAILURE;
// we have to find the right message handler - indexed by topic // we have to find the right message handler - indexed by topic
for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) {
{ if (c->messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(topicName, (char *)c->messageHandlers[i].topicFilter) ||
if (c->messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(topicName, (char*)c->messageHandlers[i].topicFilter) || isTopicMatched((char *)c->messageHandlers[i].topicFilter, topicName))) {
isTopicMatched((char*)c->messageHandlers[i].topicFilter, topicName))) if (c->messageHandlers[i].fp != NULL) {
{
if (c->messageHandlers[i].fp != NULL)
{
MessageData md; MessageData md;
NewMessageData(&md, topicName, message); NewMessageData(&md, topicName, message);
c->messageHandlers[i].fp(&md); c->messageHandlers[i].fp(&md);
@ -197,8 +245,7 @@ int deliverMessage(MQTTClient* c, MQTTString* topicName, MQTTMessage* message)
} }
} }
if (rc == FAILURE && c->defaultMessageHandler != NULL) if (rc == FAILURE && c->defaultMessageHandler != NULL) {
{
MessageData md; MessageData md;
NewMessageData(&md, topicName, message); NewMessageData(&md, topicName, message);
c->defaultMessageHandler(&md); c->defaultMessageHandler(&md);
@ -209,25 +256,27 @@ int deliverMessage(MQTTClient* c, MQTTString* topicName, MQTTMessage* message)
} }
int keepalive(MQTTClient* c) int keepalive(MQTTClient *c)
{ {
int rc = SUCCESS; int rc = SUCCESS;
if (c->keepAliveInterval == 0) if (c->keepAliveInterval == 0) {
goto exit; goto exit;
}
if (TimerIsExpired(&c->last_sent) || TimerIsExpired(&c->last_received)) if (TimerIsExpired(&c->last_sent) || TimerIsExpired(&c->last_received)) {
{ if (c->ping_outstanding && TimerIsExpired(&c->ping_wait)) {
if (c->ping_outstanding)
rc = FAILURE; /* PINGRESP not received in keepalive interval */ rc = FAILURE; /* PINGRESP not received in keepalive interval */
else } else {
{
Timer timer; Timer timer;
TimerInit(&timer); TimerInit(&timer);
TimerCountdownMS(&timer, 1000); TimerCountdownMS(&timer, 1000);
int len = MQTTSerialize_pingreq(c->buf, c->buf_size); int len = MQTTSerialize_pingreq(c->buf, c->buf_size);
if (len > 0 && (rc = sendPacket(c, len, &timer)) == SUCCESS) // send the ping packet
if (len > 0 && (rc = sendPacket(c, len, &timer)) == SUCCESS) { // send the ping packet
c->ping_outstanding = 1; c->ping_outstanding = 1;
TimerCountdownMS(&c->ping_wait, CONFIG_MQTT_PING_TIMEOUT);
}
} }
} }
@ -236,89 +285,109 @@ exit:
} }
void MQTTCleanSession(MQTTClient* c) void MQTTCleanSession(MQTTClient *c)
{ {
int i = 0; int i = 0;
for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) {
c->messageHandlers[i].topicFilter = NULL; c->messageHandlers[i].topicFilter = NULL;
}
} }
void MQTTCloseSession(MQTTClient* c) void MQTTCloseSession(MQTTClient *c)
{ {
ESP_LOGW(TAG, "mqtt close session");
c->ping_outstanding = 0; c->ping_outstanding = 0;
c->isconnected = 0; c->isconnected = 0;
if (c->cleansession)
if (c->cleansession) {
MQTTCleanSession(c); MQTTCleanSession(c);
}
} }
int cycle(MQTTClient* c, Timer* timer) int cycle(MQTTClient *c, Timer *timer)
{ {
int len = 0, int len = 0,
rc = SUCCESS; rc = SUCCESS;
int packet_type = readPacket(c, timer); /* read the socket, see what work is due */ int packet_type = readPacket(c, timer); /* read the socket, see what work is due */
switch (packet_type) switch (packet_type) {
{
default: default:
/* no more data to read, unrecoverable. Or read packet fails due to unexpected network error */ /* no more data to read, unrecoverable. Or read packet fails due to unexpected network error */
rc = packet_type; rc = packet_type;
goto exit; goto exit;
case 0: /* timed out reading packet */ case 0: /* timed out reading packet */
break; break;
case CONNACK: case CONNACK:
case PUBACK: case PUBACK:
case SUBACK: case SUBACK:
case UNSUBACK: case UNSUBACK:
break; break;
case PUBLISH:
{ case PUBLISH: {
MQTTString topicName; MQTTString topicName;
MQTTMessage msg; MQTTMessage msg;
int intQoS; int intQoS;
msg.payloadlen = 0; /* this is a size_t, but deserialize publish sets this as int */ msg.payloadlen = 0; /* this is a size_t, but deserialize publish sets this as int */
if (MQTTDeserialize_publish(&msg.dup, &intQoS, &msg.retained, &msg.id, &topicName, if (MQTTDeserialize_publish(&msg.dup, &intQoS, &msg.retained, &msg.id, &topicName,
(unsigned char**)&msg.payload, (int*)&msg.payloadlen, c->readbuf, c->readbuf_size) != 1) (unsigned char **)&msg.payload, (int *)&msg.payloadlen, c->readbuf, c->readbuf_size) != 1) {
goto exit; goto exit;
}
msg.qos = (enum QoS)intQoS; msg.qos = (enum QoS)intQoS;
deliverMessage(c, &topicName, &msg); deliverMessage(c, &topicName, &msg);
if (msg.qos != QOS0)
{ if (msg.qos != QOS0) {
if (msg.qos == QOS1) if (msg.qos == QOS1) {
len = MQTTSerialize_ack(c->buf, c->buf_size, PUBACK, 0, msg.id); len = MQTTSerialize_ack(c->buf, c->buf_size, PUBACK, 0, msg.id);
else if (msg.qos == QOS2) } else if (msg.qos == QOS2) {
len = MQTTSerialize_ack(c->buf, c->buf_size, PUBREC, 0, msg.id); len = MQTTSerialize_ack(c->buf, c->buf_size, PUBREC, 0, msg.id);
if (len <= 0) }
if (len <= 0) {
rc = FAILURE; rc = FAILURE;
else } else {
rc = sendPacket(c, len, timer); rc = sendPacket(c, len, timer);
if (rc == FAILURE) }
if (rc == FAILURE) {
goto exit; // there was a problem goto exit; // there was a problem
} }
}
break; break;
} }
case PUBREC: case PUBREC:
case PUBREL: case PUBREL: {
{
unsigned short mypacketid; unsigned short mypacketid;
unsigned char dup, type; unsigned char dup, type;
if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1) {
rc = FAILURE; rc = FAILURE;
else if ((len = MQTTSerialize_ack(c->buf, c->buf_size, } else if ((len = MQTTSerialize_ack(c->buf, c->buf_size,
(packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) <= 0) (packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) <= 0) {
rc = FAILURE; rc = FAILURE;
else if ((rc = sendPacket(c, len, timer)) != SUCCESS) // send the PUBREL packet } else if ((rc = sendPacket(c, len, timer)) != SUCCESS) { // send the PUBREL packet
rc = FAILURE; // there was a problem rc = FAILURE; // there was a problem
if (rc == FAILURE) }
if (rc == FAILURE) {
goto exit; // there was a problem goto exit; // there was a problem
}
break; break;
} }
case PUBCOMP: case PUBCOMP:
break; break;
case PINGRESP: case PINGRESP:
c->ping_outstanding = 0; c->ping_outstanding = 0;
break; break;
@ -330,15 +399,18 @@ int cycle(MQTTClient* c, Timer* timer)
} }
exit: exit:
if (rc == SUCCESS)
if (rc == SUCCESS) {
rc = packet_type; rc = packet_type;
else if (c->isconnected) } else if (c->isconnected) {
MQTTCloseSession(c); MQTTCloseSession(c);
}
return rc; return rc;
} }
int MQTTYield(MQTTClient* c, int timeout_ms) int MQTTYield(MQTTClient *c, int timeout_ms)
{ {
int rc = SUCCESS; int rc = SUCCESS;
Timer timer; Timer timer;
@ -346,10 +418,8 @@ int MQTTYield(MQTTClient* c, int timeout_ms)
TimerInit(&timer); TimerInit(&timer);
TimerCountdownMS(&timer, timeout_ms); TimerCountdownMS(&timer, timeout_ms);
do do {
{ if (cycle(c, &timer) < 0) {
if (cycle(c, &timer) < 0)
{
rc = FAILURE; rc = FAILURE;
break; break;
} }
@ -358,20 +428,30 @@ int MQTTYield(MQTTClient* c, int timeout_ms)
return rc; return rc;
} }
void MQTTRun(void* parm) void MQTTRun(void *parm)
{ {
Timer timer; Timer timer;
MQTTClient* c = (MQTTClient*)parm; MQTTClient *c = (MQTTClient *)parm;
TimerInit(&timer); TimerInit(&timer);
while (1) while (1) {
{ TimerCountdownMS(&timer, CONFIG_MQTT_RECV_CYCLE); /* Don't wait too long if no traffic is incoming */
TimerCountdownMS(&timer, 500); /* Don't wait too long if no traffic is incoming */
#if defined(MQTT_TASK) #if defined(MQTT_TASK)
MutexLock(&c->mutex); MutexLock(&c->mutex);
#endif #endif
cycle(c, &timer);
int rc = cycle(c, &timer);
if (rc == FAILURE) {
ESP_LOGE(TAG, "MQTTRun cycle failed");
#if defined(MQTT_TASK)
MutexUnlock(&c->mutex);
#endif
vTaskDelete(NULL);
}
#if defined(MQTT_TASK) #if defined(MQTT_TASK)
MutexUnlock(&c->mutex); MutexUnlock(&c->mutex);
#endif #endif
@ -380,32 +460,29 @@ void MQTTRun(void* parm)
#if defined(MQTT_TASK) #if defined(MQTT_TASK)
int MQTTStartTask(MQTTClient* client) int MQTTStartTask(MQTTClient *client)
{ {
return ThreadStart(&client->thread, &MQTTRun, client); return ThreadStart(&client->thread, &MQTTRun, client);
} }
#endif #endif
int waitfor(MQTTClient* c, int packet_type, Timer* timer) int waitfor(MQTTClient *c, int packet_type, Timer *timer)
{ {
int rc = FAILURE; int rc = FAILURE;
do do {
{ if (TimerIsExpired(timer)) {
if (TimerIsExpired(timer))
break; // we timed out break; // we timed out
rc = cycle(c, timer);
} }
while (rc != packet_type && rc >= 0);
rc = cycle(c, timer);
} while (rc != packet_type && rc >= 0);
return rc; return rc;
} }
int MQTTConnectWithResults(MQTTClient *c, MQTTPacket_connectData *options, MQTTConnackData *data)
int MQTTConnectWithResults(MQTTClient* c, MQTTPacket_connectData* options, MQTTConnackData* data)
{ {
Timer connect_timer; Timer connect_timer;
int rc = FAILURE; int rc = FAILURE;
@ -415,39 +492,47 @@ int MQTTConnectWithResults(MQTTClient* c, MQTTPacket_connectData* options, MQTTC
#if defined(MQTT_TASK) #if defined(MQTT_TASK)
MutexLock(&c->mutex); MutexLock(&c->mutex);
#endif #endif
if (c->isconnected) /* don't send connect packet again if we are already connected */
if (c->isconnected) { /* don't send connect packet again if we are already connected */
goto exit; goto exit;
}
TimerInit(&connect_timer); TimerInit(&connect_timer);
TimerCountdownMS(&connect_timer, c->command_timeout_ms); TimerCountdownMS(&connect_timer, c->command_timeout_ms);
if (options == 0) if (options == 0) {
options = &default_options; /* set default options if none were supplied */ options = &default_options; /* set default options if none were supplied */
}
c->keepAliveInterval = options->keepAliveInterval; c->keepAliveInterval = options->keepAliveInterval;
c->cleansession = options->cleansession; c->cleansession = options->cleansession;
TimerCountdown(&c->last_received, c->keepAliveInterval); TimerCountdown(&c->last_received, c->keepAliveInterval);
if ((len = MQTTSerialize_connect(c->buf, c->buf_size, options)) <= 0)
if ((len = MQTTSerialize_connect(c->buf, c->buf_size, options)) <= 0) {
goto exit; goto exit;
if ((rc = sendPacket(c, len, &connect_timer)) != SUCCESS) // send the connect packet }
if ((rc = sendPacket(c, len, &connect_timer)) != SUCCESS) { // send the connect packet
goto exit; // there was a problem goto exit; // there was a problem
}
// this will be a blocking call, wait for the connack // this will be a blocking call, wait for the connack
if (waitfor(c, CONNACK, &connect_timer) == CONNACK) if (waitfor(c, CONNACK, &connect_timer) == CONNACK) {
{
data->rc = 0; data->rc = 0;
data->sessionPresent = 0; data->sessionPresent = 0;
if (MQTTDeserialize_connack(&data->sessionPresent, &data->rc, c->readbuf, c->readbuf_size) == 1)
if (MQTTDeserialize_connack(&data->sessionPresent, &data->rc, c->readbuf, c->readbuf_size) == 1) {
rc = data->rc; rc = data->rc;
else } else {
rc = FAILURE; rc = FAILURE;
} }
else } else {
rc = FAILURE; rc = FAILURE;
}
exit: exit:
if (rc == SUCCESS)
{ if (rc == SUCCESS) {
c->isconnected = 1; c->isconnected = 1;
c->ping_outstanding = 0; c->ping_outstanding = 0;
} }
@ -460,57 +545,54 @@ exit:
} }
int MQTTConnect(MQTTClient* c, MQTTPacket_connectData* options) int MQTTConnect(MQTTClient *c, MQTTPacket_connectData *options)
{ {
MQTTConnackData data; MQTTConnackData data;
return MQTTConnectWithResults(c, options, &data); return MQTTConnectWithResults(c, options, &data);
} }
int MQTTSetMessageHandler(MQTTClient* c, const char* topicFilter, messageHandler messageHandler) int MQTTSetMessageHandler(MQTTClient *c, const char *topicFilter, messageHandler messageHandler)
{ {
int rc = FAILURE; int rc = FAILURE;
int i = -1; int i = -1;
/* first check for an existing matching slot */ /* first check for an existing matching slot */
for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) {
{ if (c->messageHandlers[i].topicFilter != NULL && strcmp(c->messageHandlers[i].topicFilter, topicFilter) == 0) {
if (c->messageHandlers[i].topicFilter != NULL && strcmp(c->messageHandlers[i].topicFilter, topicFilter) == 0) if (messageHandler == NULL) { /* remove existing */
{
if (messageHandler == NULL) /* remove existing */
{
c->messageHandlers[i].topicFilter = NULL; c->messageHandlers[i].topicFilter = NULL;
c->messageHandlers[i].fp = NULL; c->messageHandlers[i].fp = NULL;
} }
rc = SUCCESS; /* return i when adding new subscription */ rc = SUCCESS; /* return i when adding new subscription */
break; break;
} }
} }
/* if no existing, look for empty slot (unless we are removing) */ /* if no existing, look for empty slot (unless we are removing) */
if (messageHandler != NULL) { if (messageHandler != NULL) {
if (rc == FAILURE) if (rc == FAILURE) {
{ for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) {
for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) if (c->messageHandlers[i].topicFilter == NULL) {
{
if (c->messageHandlers[i].topicFilter == NULL)
{
rc = SUCCESS; rc = SUCCESS;
break; break;
} }
} }
} }
if (i < MAX_MESSAGE_HANDLERS)
{ if (i < MAX_MESSAGE_HANDLERS) {
c->messageHandlers[i].topicFilter = topicFilter; c->messageHandlers[i].topicFilter = topicFilter;
c->messageHandlers[i].fp = messageHandler; c->messageHandlers[i].fp = messageHandler;
} }
} }
return rc; return rc;
} }
int MQTTSubscribeWithResults(MQTTClient* c, const char* topicFilter, enum QoS qos, int MQTTSubscribeWithResults(MQTTClient *c, const char *topicFilter, enum QoS qos,
messageHandler messageHandler, MQTTSubackData* data) messageHandler messageHandler, MQTTSubackData *data)
{ {
int rc = FAILURE; int rc = FAILURE;
Timer timer; Timer timer;
@ -521,36 +603,44 @@ int MQTTSubscribeWithResults(MQTTClient* c, const char* topicFilter, enum QoS qo
#if defined(MQTT_TASK) #if defined(MQTT_TASK)
MutexLock(&c->mutex); MutexLock(&c->mutex);
#endif #endif
if (!c->isconnected)
if (!c->isconnected) {
goto exit; goto exit;
}
TimerInit(&timer); TimerInit(&timer);
TimerCountdownMS(&timer, c->command_timeout_ms); TimerCountdownMS(&timer, c->command_timeout_ms);
len = MQTTSerialize_subscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic, (int*)&qos); len = MQTTSerialize_subscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic, (int *)&qos);
if (len <= 0)
goto exit;
if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet
goto exit; // there was a problem
if (waitfor(c, SUBACK, &timer) == SUBACK) // wait for suback if (len <= 0) {
{ goto exit;
}
if ((rc = sendPacket(c, len, &timer)) != SUCCESS) { // send the subscribe packet
goto exit; // there was a problem
}
if (waitfor(c, SUBACK, &timer) == SUBACK) { // wait for suback
int count = 0; int count = 0;
unsigned short mypacketid; unsigned short mypacketid;
data->grantedQoS = QOS0; data->grantedQoS = QOS0;
if (MQTTDeserialize_suback(&mypacketid, 1, &count, (int*)&data->grantedQoS, c->readbuf, c->readbuf_size) == 1)
{ if (MQTTDeserialize_suback(&mypacketid, 1, &count, (int *)&data->grantedQoS, c->readbuf, c->readbuf_size) == 1) {
if (data->grantedQoS != 0x80) if (data->grantedQoS != 0x80) {
rc = MQTTSetMessageHandler(c, topicFilter, messageHandler); rc = MQTTSetMessageHandler(c, topicFilter, messageHandler);
} }
} }
else } else {
rc = FAILURE; rc = FAILURE;
}
exit: exit:
if (rc == FAILURE) { if (rc == FAILURE) {
MQTTCloseSession(c); MQTTCloseSession(c);
} }
#if defined(MQTT_TASK) #if defined(MQTT_TASK)
MutexUnlock(&c->mutex); MutexUnlock(&c->mutex);
#endif #endif
@ -558,7 +648,7 @@ exit:
} }
int MQTTSubscribe(MQTTClient* c, const char* topicFilter, enum QoS qos, int MQTTSubscribe(MQTTClient *c, const char *topicFilter, enum QoS qos,
messageHandler messageHandler) messageHandler messageHandler)
{ {
MQTTSubackData data; MQTTSubackData data;
@ -566,7 +656,7 @@ int MQTTSubscribe(MQTTClient* c, const char* topicFilter, enum QoS qos,
} }
int MQTTUnsubscribe(MQTTClient* c, const char* topicFilter) int MQTTUnsubscribe(MQTTClient *c, const char *topicFilter)
{ {
int rc = FAILURE; int rc = FAILURE;
Timer timer; Timer timer;
@ -577,32 +667,39 @@ int MQTTUnsubscribe(MQTTClient* c, const char* topicFilter)
#if defined(MQTT_TASK) #if defined(MQTT_TASK)
MutexLock(&c->mutex); MutexLock(&c->mutex);
#endif #endif
if (!c->isconnected)
if (!c->isconnected) {
goto exit; goto exit;
}
TimerInit(&timer); TimerInit(&timer);
TimerCountdownMS(&timer, c->command_timeout_ms); TimerCountdownMS(&timer, c->command_timeout_ms);
if ((len = MQTTSerialize_unsubscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic)) <= 0) if ((len = MQTTSerialize_unsubscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic)) <= 0) {
goto exit; goto exit;
if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet }
goto exit; // there was a problem
if (waitfor(c, UNSUBACK, &timer) == UNSUBACK) if ((rc = sendPacket(c, len, &timer)) != SUCCESS) { // send the subscribe packet
{ goto exit; // there was a problem
}
if (waitfor(c, UNSUBACK, &timer) == UNSUBACK) {
unsigned short mypacketid; // should be the same as the packetid above unsigned short mypacketid; // should be the same as the packetid above
if (MQTTDeserialize_unsuback(&mypacketid, c->readbuf, c->readbuf_size) == 1)
{ if (MQTTDeserialize_unsuback(&mypacketid, c->readbuf, c->readbuf_size) == 1) {
/* remove the subscription message handler associated with this topic, if there is one */ /* remove the subscription message handler associated with this topic, if there is one */
MQTTSetMessageHandler(c, topicFilter, NULL); MQTTSetMessageHandler(c, topicFilter, NULL);
} }
} } else {
else
rc = FAILURE; rc = FAILURE;
}
exit: exit:
if (rc == FAILURE)
if (rc == FAILURE) {
MQTTCloseSession(c); MQTTCloseSession(c);
}
#if defined(MQTT_TASK) #if defined(MQTT_TASK)
MutexUnlock(&c->mutex); MutexUnlock(&c->mutex);
#endif #endif
@ -610,7 +707,7 @@ exit:
} }
int MQTTPublish(MQTTClient* c, const char* topicName, MQTTMessage* message) int MQTTPublish(MQTTClient *c, const char *topicName, MQTTMessage *message)
{ {
int rc = FAILURE; int rc = FAILURE;
Timer timer; Timer timer;
@ -621,50 +718,59 @@ int MQTTPublish(MQTTClient* c, const char* topicName, MQTTMessage* message)
#if defined(MQTT_TASK) #if defined(MQTT_TASK)
MutexLock(&c->mutex); MutexLock(&c->mutex);
#endif #endif
if (!c->isconnected)
if (!c->isconnected) {
goto exit; goto exit;
}
TimerInit(&timer); TimerInit(&timer);
TimerCountdownMS(&timer, c->command_timeout_ms); TimerCountdownMS(&timer, c->command_timeout_ms);
if (message->qos == QOS1 || message->qos == QOS2) if (message->qos == QOS1 || message->qos == QOS2) {
message->id = getNextPacketId(c); message->id = getNextPacketId(c);
}
len = MQTTSerialize_publish(c->buf, c->buf_size, 0, message->qos, message->retained, message->id, len = MQTTSerialize_publish(c->buf, c->buf_size, 0, message->qos, message->retained, message->id,
topic, (unsigned char*)message->payload, message->payloadlen); topic, (unsigned char *)message->payload, message->payloadlen);
if (len <= 0)
goto exit;
if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet
goto exit; // there was a problem
if (message->qos == QOS1) if (len <= 0) {
{ goto exit;
if (waitfor(c, PUBACK, &timer) == PUBACK) }
{
if ((rc = sendPacket(c, len, &timer)) != SUCCESS) { // send the subscribe packet
goto exit; // there was a problem
}
if (message->qos == QOS1) {
if (waitfor(c, PUBACK, &timer) == PUBACK) {
unsigned short mypacketid; unsigned short mypacketid;
unsigned char dup, type; unsigned char dup, type;
if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1) {
rc = FAILURE; rc = FAILURE;
} }
else } else {
rc = FAILURE; rc = FAILURE;
} }
else if (message->qos == QOS2) } else if (message->qos == QOS2) {
{ if (waitfor(c, PUBCOMP, &timer) == PUBCOMP) {
if (waitfor(c, PUBCOMP, &timer) == PUBCOMP)
{
unsigned short mypacketid; unsigned short mypacketid;
unsigned char dup, type; unsigned char dup, type;
if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1) {
rc = FAILURE; rc = FAILURE;
} }
else } else {
rc = FAILURE; rc = FAILURE;
} }
}
exit: exit:
if (rc == FAILURE)
if (rc == FAILURE) {
MQTTCloseSession(c); MQTTCloseSession(c);
}
#if defined(MQTT_TASK) #if defined(MQTT_TASK)
MutexUnlock(&c->mutex); MutexUnlock(&c->mutex);
#endif #endif
@ -672,7 +778,7 @@ exit:
} }
int MQTTDisconnect(MQTTClient* c) int MQTTDisconnect(MQTTClient *c)
{ {
int rc = FAILURE; int rc = FAILURE;
Timer timer; // we might wait for incomplete incoming publishes to complete Timer timer; // we might wait for incomplete incoming publishes to complete
@ -685,8 +791,11 @@ int MQTTDisconnect(MQTTClient* c)
TimerCountdownMS(&timer, c->command_timeout_ms); TimerCountdownMS(&timer, c->command_timeout_ms);
len = MQTTSerialize_disconnect(c->buf, c->buf_size); len = MQTTSerialize_disconnect(c->buf, c->buf_size);
if (len > 0)
if (len > 0) {
rc = sendPacket(c, len, &timer); // send the disconnect packet rc = sendPacket(c, len, &timer); // send the disconnect packet
}
MQTTCloseSession(c); MQTTCloseSession(c);
#if defined(MQTT_TASK) #if defined(MQTT_TASK)

90
components/mqtt/paho/MQTTClient-C/src/MQTTClient.h Executable file → Normal file
View File

@ -20,18 +20,18 @@
#define MQTT_CLIENT_H #define MQTT_CLIENT_H
#if defined(__cplusplus) #if defined(__cplusplus)
extern "C" { extern "C" {
#endif #endif
#if defined(WIN32_DLL) || defined(WIN64_DLL) #if defined(WIN32_DLL) || defined(WIN64_DLL)
#define DLLImport __declspec(dllimport) #define DLLImport __declspec(dllimport)
#define DLLExport __declspec(dllexport) #define DLLExport __declspec(dllexport)
#elif defined(LINUX_SO) #elif defined(LINUX_SO)
#define DLLImport extern #define DLLImport extern
#define DLLExport __attribute__ ((visibility ("default"))) #define DLLExport __attribute__ ((visibility ("default")))
#else #else
#define DLLImport #define DLLImport
#define DLLExport #define DLLExport
#endif #endif
#include "MQTTPacket.h" #include "MQTTPacket.h"
@ -51,7 +51,7 @@
#define MAX_MESSAGE_HANDLERS 5 /* redefinable - how many subscriptions do you want? */ #define MAX_MESSAGE_HANDLERS 5 /* redefinable - how many subscriptions do you want? */
#endif #endif
enum QoS { QOS0, QOS1, QOS2, SUBFAIL=0x80 }; enum QoS { QOS0, QOS1, QOS2, SUBFAIL = 0x80 };
/* all failure return codes must be negative */ /* all failure return codes must be negative */
enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 }; enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 };
@ -67,14 +67,13 @@ typedef struct Network
/* The Timer structure must be defined in the platform specific header, /* The Timer structure must be defined in the platform specific header,
* and have the following functions to operate on it. */ * and have the following functions to operate on it. */
extern void TimerInit(Timer*); extern void TimerInit(Timer *);
extern char TimerIsExpired(Timer*); extern char TimerIsExpired(Timer *);
extern void TimerCountdownMS(Timer*, unsigned int); extern void TimerCountdownMS(Timer *, unsigned int);
extern void TimerCountdown(Timer*, unsigned int); extern void TimerCountdown(Timer *, unsigned int);
extern int TimerLeftMS(Timer*); extern int TimerLeftMS(Timer *);
typedef struct MQTTMessage typedef struct MQTTMessage {
{
enum QoS qos; enum QoS qos;
unsigned char retained; unsigned char retained;
unsigned char dup; unsigned char dup;
@ -83,27 +82,23 @@ typedef struct MQTTMessage
size_t payloadlen; size_t payloadlen;
} MQTTMessage; } MQTTMessage;
typedef struct MessageData typedef struct MessageData {
{ MQTTMessage *message;
MQTTMessage* message; MQTTString *topicName;
MQTTString* topicName;
} MessageData; } MessageData;
typedef struct MQTTConnackData typedef struct MQTTConnackData {
{
unsigned char rc; unsigned char rc;
unsigned char sessionPresent; unsigned char sessionPresent;
} MQTTConnackData; } MQTTConnackData;
typedef struct MQTTSubackData typedef struct MQTTSubackData {
{
enum QoS grantedQoS; enum QoS grantedQoS;
} MQTTSubackData; } MQTTSubackData;
typedef void (*messageHandler)(MessageData*); typedef void (*messageHandler)(MessageData *);
typedef struct MQTTClient typedef struct MQTTClient {
{
unsigned int next_packetid, unsigned int next_packetid,
command_timeout_ms; command_timeout_ms;
size_t buf_size, size_t buf_size,
@ -115,16 +110,15 @@ typedef struct MQTTClient
int isconnected; int isconnected;
int cleansession; int cleansession;
struct MessageHandlers struct MessageHandlers {
{ const char *topicFilter;
const char* topicFilter; void (*fp)(MessageData *);
void (*fp) (MessageData*);
} messageHandlers[MAX_MESSAGE_HANDLERS]; /* Message handlers are indexed by subscription topic */ } messageHandlers[MAX_MESSAGE_HANDLERS]; /* Message handlers are indexed by subscription topic */
void (*defaultMessageHandler) (MessageData*); void (*defaultMessageHandler)(MessageData *);
Network* ipstack; Network *ipstack;
Timer last_sent, last_received; Timer last_sent, last_received, ping_wait;
#if defined(MQTT_TASK) #if defined(MQTT_TASK)
Mutex mutex; Mutex mutex;
Thread thread; Thread thread;
@ -141,23 +135,23 @@ typedef struct MQTTClient
* @param command_timeout_ms * @param command_timeout_ms
* @param * @param
*/ */
DLLExport void MQTTClientInit(MQTTClient* client, Network* network, unsigned int command_timeout_ms, DLLExport bool MQTTClientInit(MQTTClient *client, Network *network, unsigned int command_timeout_ms,
unsigned char* sendbuf, size_t sendbuf_size, unsigned char* readbuf, size_t readbuf_size); unsigned char *sendbuf, size_t sendbuf_size, unsigned char *readbuf, size_t readbuf_size);
/** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
* The nework object must be connected to the network endpoint before calling this * The nework object must be connected to the network endpoint before calling this
* @param options - connect options * @param options - connect options
* @return success code * @return success code
*/ */
DLLExport int MQTTConnectWithResults(MQTTClient* client, MQTTPacket_connectData* options, DLLExport int MQTTConnectWithResults(MQTTClient *client, MQTTPacket_connectData *options,
MQTTConnackData* data); MQTTConnackData *data);
/** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
* The nework object must be connected to the network endpoint before calling this * The nework object must be connected to the network endpoint before calling this
* @param options - connect options * @param options - connect options
* @return success code * @return success code
*/ */
DLLExport int MQTTConnect(MQTTClient* client, MQTTPacket_connectData* options); DLLExport int MQTTConnect(MQTTClient *client, MQTTPacket_connectData *options);
/** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs /** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
* @param client - the client object to use * @param client - the client object to use
@ -165,7 +159,7 @@ DLLExport int MQTTConnect(MQTTClient* client, MQTTPacket_connectData* options);
* @param message - the message to send * @param message - the message to send
* @return success code * @return success code
*/ */
DLLExport int MQTTPublish(MQTTClient* client, const char*, MQTTMessage*); DLLExport int MQTTPublish(MQTTClient *client, const char *, MQTTMessage *);
/** MQTT SetMessageHandler - set or remove a per topic message handler /** MQTT SetMessageHandler - set or remove a per topic message handler
* @param client - the client object to use * @param client - the client object to use
@ -173,7 +167,7 @@ DLLExport int MQTTPublish(MQTTClient* client, const char*, MQTTMessage*);
* @param messageHandler - pointer to the message handler function or NULL to remove * @param messageHandler - pointer to the message handler function or NULL to remove
* @return success code * @return success code
*/ */
DLLExport int MQTTSetMessageHandler(MQTTClient* c, const char* topicFilter, messageHandler messageHandler); DLLExport int MQTTSetMessageHandler(MQTTClient *c, const char *topicFilter, messageHandler messageHandler);
/** MQTT Subscribe - send an MQTT subscribe packet and wait for suback before returning. /** MQTT Subscribe - send an MQTT subscribe packet and wait for suback before returning.
* @param client - the client object to use * @param client - the client object to use
@ -181,7 +175,7 @@ DLLExport int MQTTSetMessageHandler(MQTTClient* c, const char* topicFilter, mess
* @param message - the message to send * @param message - the message to send
* @return success code * @return success code
*/ */
DLLExport int MQTTSubscribe(MQTTClient* client, const char* topicFilter, enum QoS, messageHandler); DLLExport int MQTTSubscribe(MQTTClient *client, const char *topicFilter, enum QoS, messageHandler);
/** MQTT Subscribe - send an MQTT subscribe packet and wait for suback before returning. /** MQTT Subscribe - send an MQTT subscribe packet and wait for suback before returning.
* @param client - the client object to use * @param client - the client object to use
@ -190,33 +184,33 @@ DLLExport int MQTTSubscribe(MQTTClient* client, const char* topicFilter, enum Qo
* @param data - suback granted QoS returned * @param data - suback granted QoS returned
* @return success code * @return success code
*/ */
DLLExport int MQTTSubscribeWithResults(MQTTClient* client, const char* topicFilter, enum QoS, messageHandler, MQTTSubackData* data); DLLExport int MQTTSubscribeWithResults(MQTTClient *client, const char *topicFilter, enum QoS, messageHandler, MQTTSubackData *data);
/** MQTT Subscribe - send an MQTT unsubscribe packet and wait for unsuback before returning. /** MQTT Subscribe - send an MQTT unsubscribe packet and wait for unsuback before returning.
* @param client - the client object to use * @param client - the client object to use
* @param topicFilter - the topic filter to unsubscribe from * @param topicFilter - the topic filter to unsubscribe from
* @return success code * @return success code
*/ */
DLLExport int MQTTUnsubscribe(MQTTClient* client, const char* topicFilter); DLLExport int MQTTUnsubscribe(MQTTClient *client, const char *topicFilter);
/** MQTT Disconnect - send an MQTT disconnect packet and close the connection /** MQTT Disconnect - send an MQTT disconnect packet and close the connection
* @param client - the client object to use * @param client - the client object to use
* @return success code * @return success code
*/ */
DLLExport int MQTTDisconnect(MQTTClient* client); DLLExport int MQTTDisconnect(MQTTClient *client);
/** MQTT Yield - MQTT background /** MQTT Yield - MQTT background
* @param client - the client object to use * @param client - the client object to use
* @param time - the time, in milliseconds, to yield for * @param time - the time, in milliseconds, to yield for
* @return success code * @return success code
*/ */
DLLExport int MQTTYield(MQTTClient* client, int time); DLLExport int MQTTYield(MQTTClient *client, int time);
/** MQTT isConnected /** MQTT isConnected
* @param client - the client object to use * @param client - the client object to use
* @return truth value indicating whether the client is connected to the server * @return truth value indicating whether the client is connected to the server
*/ */
static inline DLLExport int MQTTIsConnected(MQTTClient* client) static inline DLLExport int MQTTIsConnected(MQTTClient *client)
{ {
return client->isconnected; return client->isconnected;
} }
@ -226,11 +220,11 @@ static inline DLLExport int MQTTIsConnected(MQTTClient* client)
* @param client - the client object to use * @param client - the client object to use
* @return success code * @return success code
*/ */
DLLExport int MQTTStartTask(MQTTClient* client); DLLExport int MQTTStartTask(MQTTClient *client);
#endif #endif
#if defined(__cplusplus) #if defined(__cplusplus)
} }
#endif #endif
#endif #endif