diff --git a/components/VERSION.md b/components/VERSION.md index 3707a7a2..80c76730 100644 --- a/components/VERSION.md +++ b/components/VERSION.md @@ -21,6 +21,8 @@ ## nopoll(websocket) ## paho MQTT +- Source: https://github.com/eclipse/paho.mqtt.embedded-c +- Version: 29ab2aa ## spiffs diff --git a/components/mqtt/Makefile.projbuild b/components/mqtt/Makefile.projbuild index f18d5785..03de0cd5 100644 --- a/components/mqtt/Makefile.projbuild +++ b/components/mqtt/Makefile.projbuild @@ -1 +1 @@ -CPPFLAGS += -DMQTT_TASK +CPPFLAGS += -DMQTT_TASK -DMQTTCLIENT_PLATFORM_HEADER=MQTTFreeRTOS.h diff --git a/components/mqtt/paho/MQTTClient-C/src/FreeRTOS/MQTTFreeRTOS.c b/components/mqtt/paho/MQTTClient-C/src/FreeRTOS/MQTTFreeRTOS.c index 7f6edef7..428e6d44 100755 --- a/components/mqtt/paho/MQTTClient-C/src/FreeRTOS/MQTTFreeRTOS.c +++ b/components/mqtt/paho/MQTTClient-C/src/FreeRTOS/MQTTFreeRTOS.c @@ -15,17 +15,16 @@ * Ian Craggs - convert to FreeRTOS *******************************************************************************/ -#include - -#include - #include "MQTTFreeRTOS.h" +#include +#include + int ThreadStart(Thread* thread, void (*fn)(void*), void* arg) { int rc = 0; uint16_t usTaskStackSize = (configMINIMAL_STACK_SIZE * 5); - unsigned portBASE_TYPE uxTaskPriority = uxTaskPriorityGet(NULL); /* set the priority as the same as the calling task*/ + UBaseType_t uxTaskPriority = uxTaskPriorityGet(NULL); /* set the priority as the same as the calling task*/ rc = xTaskCreate(fn, /* The function that implements the task. */ "MQTTTask", /* Just a text name for the task to aid debugging. */ @@ -43,21 +42,22 @@ void MutexInit(Mutex* mutex) mutex->sem = xSemaphoreCreateMutex(); } - int MutexLock(Mutex* mutex) { return xSemaphoreTake(mutex->sem, portMAX_DELAY); } - int MutexUnlock(Mutex* mutex) { - return xSemaphoreGive(mutex->sem); + int x = xSemaphoreGive(mutex->sem); + taskYIELD(); + return x; } + void TimerCountdownMS(Timer* timer, unsigned int timeout_ms) { - timer->xTicksToWait = timeout_ms / portTICK_RATE_MS; /* convert milliseconds to ticks */ + timer->xTicksToWait = timeout_ms / portTICK_PERIOD_MS; /* convert milliseconds to ticks */ vTaskSetTimeOutState(&timer->xTimeOut); /* Record the time at which this function was entered. */ } @@ -71,7 +71,7 @@ void TimerCountdown(Timer* timer, unsigned int timeout) int TimerLeftMS(Timer* timer) { xTaskCheckForTimeOut(&timer->xTimeOut, &timer->xTicksToWait); /* updates xTicksToWait to the number left */ - return timer->xTicksToWait * portTICK_RATE_MS; + return timer->xTicksToWait * portTICK_PERIOD_MS; } diff --git a/components/mqtt/paho/MQTTClient-C/src/FreeRTOS/MQTTFreeRTOS.h b/components/mqtt/paho/MQTTClient-C/src/FreeRTOS/MQTTFreeRTOS.h index e819963b..415c786d 100755 --- a/components/mqtt/paho/MQTTClient-C/src/FreeRTOS/MQTTFreeRTOS.h +++ b/components/mqtt/paho/MQTTClient-C/src/FreeRTOS/MQTTFreeRTOS.h @@ -25,14 +25,16 @@ #include "openssl/ssl.h" #endif -typedef struct Timer { - portTickType xTicksToWait; - xTimeOutType xTimeOut; +typedef struct Timer +{ + TickType_t xTicksToWait; + TimeOut_t xTimeOut; } Timer; typedef struct Network Network; -struct Network { +struct Network +{ int my_socket; int (*mqttread)(Network*, unsigned char*, unsigned int, unsigned int); int (*mqttwrite)(Network*, unsigned char*, unsigned int, unsigned int); @@ -51,16 +53,18 @@ void TimerCountdownMS(Timer*, unsigned int); void TimerCountdown(Timer*, unsigned int); int TimerLeftMS(Timer*); -typedef struct Mutex { - xSemaphoreHandle sem; +typedef struct Mutex +{ + SemaphoreHandle_t sem; } Mutex; void MutexInit(Mutex*); int MutexLock(Mutex*); int MutexUnlock(Mutex*); -typedef struct Thread { - xTaskHandle task; +typedef struct Thread +{ + TaskHandle_t task; } Thread; int ThreadStart(Thread*, void (*fn)(void*), void* arg); @@ -85,6 +89,7 @@ void NetworkInit(Network*); */ int NetworkConnect(Network* n, char* addr, int port); +#ifdef CONFIG_SSL_USING_MBEDTLS typedef struct ssl_ca_crt_key { unsigned char* cacrt; unsigned int cacrt_len; @@ -94,7 +99,6 @@ typedef struct ssl_ca_crt_key { unsigned int key_len; } ssl_ca_crt_key_t; -#ifdef CONFIG_SSL_USING_MBEDTLS /** * @brief Initialize the network structure for SSL connection * diff --git a/components/mqtt/paho/MQTTClient-C/src/MQTTClient.c b/components/mqtt/paho/MQTTClient-C/src/MQTTClient.c index e21af513..4c837eed 100755 --- a/components/mqtt/paho/MQTTClient-C/src/MQTTClient.c +++ b/components/mqtt/paho/MQTTClient-C/src/MQTTClient.c @@ -55,7 +55,7 @@ static int sendPacket(MQTTClient* c, int length, Timer* timer) void MQTTClientInit(MQTTClient* c, Network* network, unsigned int command_timeout_ms, - unsigned char* sendbuf, size_t sendbuf_size, unsigned char* readbuf, size_t readbuf_size) + unsigned char* sendbuf, size_t sendbuf_size, unsigned char* readbuf, size_t readbuf_size) { int i; c->ipstack = network; @@ -71,11 +71,11 @@ void MQTTClientInit(MQTTClient* c, Network* network, unsigned int command_timeou c->cleansession = 0; c->ping_outstanding = 0; c->defaultMessageHandler = NULL; - c->next_packetid = 1; + c->next_packetid = 1; TimerInit(&c->last_sent); TimerInit(&c->last_received); #if defined(MQTT_TASK) - MutexInit(&c->mutex); + MutexInit(&c->mutex); #endif } @@ -346,44 +346,43 @@ int MQTTYield(MQTTClient* c, int timeout_ms) TimerInit(&timer); TimerCountdownMS(&timer, timeout_ms); - do + do { if (cycle(c, &timer) < 0) { rc = FAILURE; break; } - } while (!TimerIsExpired(&timer)); + } while (!TimerIsExpired(&timer)); return rc; } - void MQTTRun(void* parm) { - Timer timer; - MQTTClient* c = (MQTTClient*)parm; + Timer timer; + MQTTClient* c = (MQTTClient*)parm; - TimerInit(&timer); + TimerInit(&timer); - while (1) - { + while (1) + { + TimerCountdownMS(&timer, 500); /* Don't wait too long if no traffic is incoming */ #if defined(MQTT_TASK) - MutexLock(&c->mutex); + MutexLock(&c->mutex); #endif - TimerCountdownMS(&timer, 500); /* Don't wait too long if no traffic is incoming */ - cycle(c, &timer); + cycle(c, &timer); #if defined(MQTT_TASK) - MutexUnlock(&c->mutex); + MutexUnlock(&c->mutex); #endif - } + } } #if defined(MQTT_TASK) int MQTTStartTask(MQTTClient* client) { - return ThreadStart(&client->thread, &MQTTRun, client); + return ThreadStart(&client->thread, &MQTTRun, client); } #endif @@ -414,10 +413,10 @@ int MQTTConnectWithResults(MQTTClient* c, MQTTPacket_connectData* options, MQTTC int len = 0; #if defined(MQTT_TASK) - MutexLock(&c->mutex); + MutexLock(&c->mutex); #endif - if (c->isconnected) /* don't send connect packet again if we are already connected */ - goto exit; + if (c->isconnected) /* don't send connect packet again if we are already connected */ + goto exit; TimerInit(&connect_timer); TimerCountdownMS(&connect_timer, c->command_timeout_ms); @@ -454,7 +453,7 @@ exit: } #if defined(MQTT_TASK) - MutexUnlock(&c->mutex); + MutexUnlock(&c->mutex); #endif return rc; @@ -520,10 +519,10 @@ int MQTTSubscribeWithResults(MQTTClient* c, const char* topicFilter, enum QoS qo topic.cstring = (char *)topicFilter; #if defined(MQTT_TASK) - MutexLock(&c->mutex); + MutexLock(&c->mutex); #endif - if (!c->isconnected) - goto exit; + if (!c->isconnected) + goto exit; TimerInit(&timer); TimerCountdownMS(&timer, c->command_timeout_ms); @@ -549,10 +548,11 @@ int MQTTSubscribeWithResults(MQTTClient* c, const char* topicFilter, enum QoS qo rc = FAILURE; exit: - if (rc == FAILURE) + if (rc == FAILURE) { MQTTCloseSession(c); + } #if defined(MQTT_TASK) - MutexUnlock(&c->mutex); + MutexUnlock(&c->mutex); #endif return rc; } @@ -575,10 +575,10 @@ int MQTTUnsubscribe(MQTTClient* c, const char* topicFilter) int len = 0; #if defined(MQTT_TASK) - MutexLock(&c->mutex); + MutexLock(&c->mutex); #endif - if (!c->isconnected) - goto exit; + if (!c->isconnected) + goto exit; TimerInit(&timer); TimerCountdownMS(&timer, c->command_timeout_ms); @@ -604,7 +604,7 @@ exit: if (rc == FAILURE) MQTTCloseSession(c); #if defined(MQTT_TASK) - MutexUnlock(&c->mutex); + MutexUnlock(&c->mutex); #endif return rc; } @@ -619,10 +619,10 @@ int MQTTPublish(MQTTClient* c, const char* topicName, MQTTMessage* message) int len = 0; #if defined(MQTT_TASK) - MutexLock(&c->mutex); + MutexLock(&c->mutex); #endif - if (!c->isconnected) - goto exit; + if (!c->isconnected) + goto exit; TimerInit(&timer); TimerCountdownMS(&timer, c->command_timeout_ms); @@ -666,7 +666,7 @@ exit: if (rc == FAILURE) MQTTCloseSession(c); #if defined(MQTT_TASK) - MutexUnlock(&c->mutex); + MutexUnlock(&c->mutex); #endif return rc; } @@ -679,18 +679,18 @@ int MQTTDisconnect(MQTTClient* c) int len = 0; #if defined(MQTT_TASK) - MutexLock(&c->mutex); + MutexLock(&c->mutex); #endif TimerInit(&timer); TimerCountdownMS(&timer, c->command_timeout_ms); - len = MQTTSerialize_disconnect(c->buf, c->buf_size); + len = MQTTSerialize_disconnect(c->buf, c->buf_size); if (len > 0) rc = sendPacket(c, len, &timer); // send the disconnect packet MQTTCloseSession(c); #if defined(MQTT_TASK) - MutexUnlock(&c->mutex); + MutexUnlock(&c->mutex); #endif return rc; } diff --git a/components/mqtt/paho/MQTTClient-C/src/MQTTClient.h b/components/mqtt/paho/MQTTClient-C/src/MQTTClient.h index 5bd187b9..d1287968 100755 --- a/components/mqtt/paho/MQTTClient-C/src/MQTTClient.h +++ b/components/mqtt/paho/MQTTClient-C/src/MQTTClient.h @@ -16,8 +16,8 @@ * Ian Craggs - add setMessageHandler function *******************************************************************************/ -#if !defined(__MQTT_CLIENT_C_) -#define __MQTT_CLIENT_C_ +#if !defined(MQTT_CLIENT_H) +#define MQTT_CLIENT_H #if defined(__cplusplus) extern "C" { @@ -35,8 +35,6 @@ #endif #include "MQTTPacket.h" -#include "stdio.h" -#include "MQTTFreeRTOS.h" #if defined(MQTTCLIENT_PLATFORM_HEADER) /* The following sequence of macros converts the MQTTCLIENT_PLATFORM_HEADER value diff --git a/components/mqtt/paho/MQTTPacket/src/MQTTPacket.c b/components/mqtt/paho/MQTTPacket/src/MQTTPacket.c index 4f1f95a7..c85bf9bf 100644 --- a/components/mqtt/paho/MQTTPacket/src/MQTTPacket.c +++ b/components/mqtt/paho/MQTTPacket/src/MQTTPacket.c @@ -410,3 +410,24 @@ exit: return rc; } +const char* MQTTPacket_msgTypesToString(enum msgTypes msgType) +{ + switch (msgType) + { + case CONNECT: return "CONNECT"; + case CONNACK: return "CONNACK"; + case PUBLISH: return "PUBLISH"; + case PUBACK: return "PUBACK"; + case PUBREC: return "PUBREC"; + case PUBREL: return "PUBREL"; + case PUBCOMP: return "PUBCOMP"; + case SUBSCRIBE: return "SUBSCRIBE"; + case SUBACK: return "SUBACK"; + case UNSUBSCRIBE: return "UNSUBSCRIBE"; + case UNSUBACK: return "UNSUBACK"; + case PINGREQ: return "PINGREQ"; + case PINGRESP: return "PINGRESP"; + case DISCONNECT: return "DISCONNECT"; + default: return NULL; + } +} diff --git a/components/mqtt/paho/MQTTPacket/src/MQTTPacket.h b/components/mqtt/paho/MQTTPacket/src/MQTTPacket.h index 0ee46231..9c161496 100644 --- a/components/mqtt/paho/MQTTPacket/src/MQTTPacket.h +++ b/components/mqtt/paho/MQTTPacket/src/MQTTPacket.h @@ -125,6 +125,8 @@ typedef struct { int MQTTPacket_readnb(unsigned char* buf, int buflen, MQTTTransport *trp); +const char* MQTTPacket_msgTypesToString(enum msgTypes); + #ifdef __cplusplus /* If this is a C++ compiler, use C linkage */ } #endif