fix(mqtt): Small fixes and upgrade.

o Upgrading to latest upstream git version(29ab2aa). Mostly
  updating FreeRTOS types.
o Adding `MQTTPacket_msgTypesToString` for debugging.
o Adding `taskYIELD` in `MutexUnlock`. This was required for my
  sending thread to be able to get in between calls to `cycle` in
  `MQTTRun`.
o Using CPPFLAGS to control which platform include to use. Make
  the code closed to upstream's version.
o Replacing the few remaining tabs with spaces.

Merges https://github.com/espressif/ESP8266_RTOS_SDK/pull/203
This commit is contained in:
Trygve Laugstøl
2018-06-01 15:54:27 +02:00
committed by Wu Jian Gang
parent 1dfd1b19fc
commit 656078b270
8 changed files with 88 additions and 61 deletions

View File

@ -21,6 +21,8 @@
## nopoll(websocket) ## nopoll(websocket)
## paho MQTT ## paho MQTT
- Source: https://github.com/eclipse/paho.mqtt.embedded-c
- Version: 29ab2aa
## spiffs ## spiffs

View File

@ -1 +1 @@
CPPFLAGS += -DMQTT_TASK CPPFLAGS += -DMQTT_TASK -DMQTTCLIENT_PLATFORM_HEADER=MQTTFreeRTOS.h

View File

@ -15,17 +15,16 @@
* Ian Craggs - convert to FreeRTOS * Ian Craggs - convert to FreeRTOS
*******************************************************************************/ *******************************************************************************/
#include <string.h>
#include <netdb.h>
#include "MQTTFreeRTOS.h" #include "MQTTFreeRTOS.h"
#include <string.h>
#include <netdb.h>
int ThreadStart(Thread* thread, void (*fn)(void*), void* arg) int ThreadStart(Thread* thread, void (*fn)(void*), void* arg)
{ {
int rc = 0; int rc = 0;
uint16_t usTaskStackSize = (configMINIMAL_STACK_SIZE * 5); uint16_t usTaskStackSize = (configMINIMAL_STACK_SIZE * 5);
unsigned portBASE_TYPE uxTaskPriority = uxTaskPriorityGet(NULL); /* set the priority as the same as the calling task*/ UBaseType_t uxTaskPriority = uxTaskPriorityGet(NULL); /* set the priority as the same as the calling task*/
rc = xTaskCreate(fn, /* The function that implements the task. */ rc = xTaskCreate(fn, /* The function that implements the task. */
"MQTTTask", /* Just a text name for the task to aid debugging. */ "MQTTTask", /* Just a text name for the task to aid debugging. */
@ -43,21 +42,22 @@ void MutexInit(Mutex* mutex)
mutex->sem = xSemaphoreCreateMutex(); mutex->sem = xSemaphoreCreateMutex();
} }
int MutexLock(Mutex* mutex) int MutexLock(Mutex* mutex)
{ {
return xSemaphoreTake(mutex->sem, portMAX_DELAY); return xSemaphoreTake(mutex->sem, portMAX_DELAY);
} }
int MutexUnlock(Mutex* mutex) int MutexUnlock(Mutex* mutex)
{ {
return xSemaphoreGive(mutex->sem); int x = xSemaphoreGive(mutex->sem);
taskYIELD();
return x;
} }
void TimerCountdownMS(Timer* timer, unsigned int timeout_ms) void TimerCountdownMS(Timer* timer, unsigned int timeout_ms)
{ {
timer->xTicksToWait = timeout_ms / portTICK_RATE_MS; /* convert milliseconds to ticks */ timer->xTicksToWait = timeout_ms / portTICK_PERIOD_MS; /* convert milliseconds to ticks */
vTaskSetTimeOutState(&timer->xTimeOut); /* Record the time at which this function was entered. */ vTaskSetTimeOutState(&timer->xTimeOut); /* Record the time at which this function was entered. */
} }
@ -71,7 +71,7 @@ void TimerCountdown(Timer* timer, unsigned int timeout)
int TimerLeftMS(Timer* timer) int TimerLeftMS(Timer* timer)
{ {
xTaskCheckForTimeOut(&timer->xTimeOut, &timer->xTicksToWait); /* updates xTicksToWait to the number left */ xTaskCheckForTimeOut(&timer->xTimeOut, &timer->xTicksToWait); /* updates xTicksToWait to the number left */
return timer->xTicksToWait * portTICK_RATE_MS; return timer->xTicksToWait * portTICK_PERIOD_MS;
} }

View File

@ -25,14 +25,16 @@
#include "openssl/ssl.h" #include "openssl/ssl.h"
#endif #endif
typedef struct Timer { typedef struct Timer
portTickType xTicksToWait; {
xTimeOutType xTimeOut; TickType_t xTicksToWait;
TimeOut_t xTimeOut;
} Timer; } Timer;
typedef struct Network Network; typedef struct Network Network;
struct Network { struct Network
{
int my_socket; int my_socket;
int (*mqttread)(Network*, unsigned char*, unsigned int, unsigned int); int (*mqttread)(Network*, unsigned char*, unsigned int, unsigned int);
int (*mqttwrite)(Network*, unsigned char*, unsigned int, unsigned int); int (*mqttwrite)(Network*, unsigned char*, unsigned int, unsigned int);
@ -51,16 +53,18 @@ void TimerCountdownMS(Timer*, unsigned int);
void TimerCountdown(Timer*, unsigned int); void TimerCountdown(Timer*, unsigned int);
int TimerLeftMS(Timer*); int TimerLeftMS(Timer*);
typedef struct Mutex { typedef struct Mutex
xSemaphoreHandle sem; {
SemaphoreHandle_t sem;
} Mutex; } Mutex;
void MutexInit(Mutex*); void MutexInit(Mutex*);
int MutexLock(Mutex*); int MutexLock(Mutex*);
int MutexUnlock(Mutex*); int MutexUnlock(Mutex*);
typedef struct Thread { typedef struct Thread
xTaskHandle task; {
TaskHandle_t task;
} Thread; } Thread;
int ThreadStart(Thread*, void (*fn)(void*), void* arg); int ThreadStart(Thread*, void (*fn)(void*), void* arg);
@ -85,6 +89,7 @@ void NetworkInit(Network*);
*/ */
int NetworkConnect(Network* n, char* addr, int port); int NetworkConnect(Network* n, char* addr, int port);
#ifdef CONFIG_SSL_USING_MBEDTLS
typedef struct ssl_ca_crt_key { typedef struct ssl_ca_crt_key {
unsigned char* cacrt; unsigned char* cacrt;
unsigned int cacrt_len; unsigned int cacrt_len;
@ -94,7 +99,6 @@ typedef struct ssl_ca_crt_key {
unsigned int key_len; unsigned int key_len;
} ssl_ca_crt_key_t; } ssl_ca_crt_key_t;
#ifdef CONFIG_SSL_USING_MBEDTLS
/** /**
* @brief Initialize the network structure for SSL connection * @brief Initialize the network structure for SSL connection
* *

View File

@ -55,7 +55,7 @@ static int sendPacket(MQTTClient* c, int length, Timer* timer)
void MQTTClientInit(MQTTClient* c, Network* network, unsigned int command_timeout_ms, void MQTTClientInit(MQTTClient* c, Network* network, unsigned int command_timeout_ms,
unsigned char* sendbuf, size_t sendbuf_size, unsigned char* readbuf, size_t readbuf_size) unsigned char* sendbuf, size_t sendbuf_size, unsigned char* readbuf, size_t readbuf_size)
{ {
int i; int i;
c->ipstack = network; c->ipstack = network;
@ -71,11 +71,11 @@ void MQTTClientInit(MQTTClient* c, Network* network, unsigned int command_timeou
c->cleansession = 0; c->cleansession = 0;
c->ping_outstanding = 0; c->ping_outstanding = 0;
c->defaultMessageHandler = NULL; c->defaultMessageHandler = NULL;
c->next_packetid = 1; c->next_packetid = 1;
TimerInit(&c->last_sent); TimerInit(&c->last_sent);
TimerInit(&c->last_received); TimerInit(&c->last_received);
#if defined(MQTT_TASK) #if defined(MQTT_TASK)
MutexInit(&c->mutex); MutexInit(&c->mutex);
#endif #endif
} }
@ -346,44 +346,43 @@ int MQTTYield(MQTTClient* c, int timeout_ms)
TimerInit(&timer); TimerInit(&timer);
TimerCountdownMS(&timer, timeout_ms); TimerCountdownMS(&timer, timeout_ms);
do do
{ {
if (cycle(c, &timer) < 0) if (cycle(c, &timer) < 0)
{ {
rc = FAILURE; rc = FAILURE;
break; break;
} }
} while (!TimerIsExpired(&timer)); } while (!TimerIsExpired(&timer));
return rc; return rc;
} }
void MQTTRun(void* parm) void MQTTRun(void* parm)
{ {
Timer timer; Timer timer;
MQTTClient* c = (MQTTClient*)parm; MQTTClient* c = (MQTTClient*)parm;
TimerInit(&timer); TimerInit(&timer);
while (1) while (1)
{ {
TimerCountdownMS(&timer, 500); /* Don't wait too long if no traffic is incoming */
#if defined(MQTT_TASK) #if defined(MQTT_TASK)
MutexLock(&c->mutex); MutexLock(&c->mutex);
#endif #endif
TimerCountdownMS(&timer, 500); /* Don't wait too long if no traffic is incoming */ cycle(c, &timer);
cycle(c, &timer);
#if defined(MQTT_TASK) #if defined(MQTT_TASK)
MutexUnlock(&c->mutex); MutexUnlock(&c->mutex);
#endif #endif
} }
} }
#if defined(MQTT_TASK) #if defined(MQTT_TASK)
int MQTTStartTask(MQTTClient* client) int MQTTStartTask(MQTTClient* client)
{ {
return ThreadStart(&client->thread, &MQTTRun, client); return ThreadStart(&client->thread, &MQTTRun, client);
} }
#endif #endif
@ -414,10 +413,10 @@ int MQTTConnectWithResults(MQTTClient* c, MQTTPacket_connectData* options, MQTTC
int len = 0; int len = 0;
#if defined(MQTT_TASK) #if defined(MQTT_TASK)
MutexLock(&c->mutex); MutexLock(&c->mutex);
#endif #endif
if (c->isconnected) /* don't send connect packet again if we are already connected */ if (c->isconnected) /* don't send connect packet again if we are already connected */
goto exit; goto exit;
TimerInit(&connect_timer); TimerInit(&connect_timer);
TimerCountdownMS(&connect_timer, c->command_timeout_ms); TimerCountdownMS(&connect_timer, c->command_timeout_ms);
@ -454,7 +453,7 @@ exit:
} }
#if defined(MQTT_TASK) #if defined(MQTT_TASK)
MutexUnlock(&c->mutex); MutexUnlock(&c->mutex);
#endif #endif
return rc; return rc;
@ -520,10 +519,10 @@ int MQTTSubscribeWithResults(MQTTClient* c, const char* topicFilter, enum QoS qo
topic.cstring = (char *)topicFilter; topic.cstring = (char *)topicFilter;
#if defined(MQTT_TASK) #if defined(MQTT_TASK)
MutexLock(&c->mutex); MutexLock(&c->mutex);
#endif #endif
if (!c->isconnected) if (!c->isconnected)
goto exit; goto exit;
TimerInit(&timer); TimerInit(&timer);
TimerCountdownMS(&timer, c->command_timeout_ms); TimerCountdownMS(&timer, c->command_timeout_ms);
@ -549,10 +548,11 @@ int MQTTSubscribeWithResults(MQTTClient* c, const char* topicFilter, enum QoS qo
rc = FAILURE; rc = FAILURE;
exit: exit:
if (rc == FAILURE) if (rc == FAILURE) {
MQTTCloseSession(c); MQTTCloseSession(c);
}
#if defined(MQTT_TASK) #if defined(MQTT_TASK)
MutexUnlock(&c->mutex); MutexUnlock(&c->mutex);
#endif #endif
return rc; return rc;
} }
@ -575,10 +575,10 @@ int MQTTUnsubscribe(MQTTClient* c, const char* topicFilter)
int len = 0; int len = 0;
#if defined(MQTT_TASK) #if defined(MQTT_TASK)
MutexLock(&c->mutex); MutexLock(&c->mutex);
#endif #endif
if (!c->isconnected) if (!c->isconnected)
goto exit; goto exit;
TimerInit(&timer); TimerInit(&timer);
TimerCountdownMS(&timer, c->command_timeout_ms); TimerCountdownMS(&timer, c->command_timeout_ms);
@ -604,7 +604,7 @@ exit:
if (rc == FAILURE) if (rc == FAILURE)
MQTTCloseSession(c); MQTTCloseSession(c);
#if defined(MQTT_TASK) #if defined(MQTT_TASK)
MutexUnlock(&c->mutex); MutexUnlock(&c->mutex);
#endif #endif
return rc; return rc;
} }
@ -619,10 +619,10 @@ int MQTTPublish(MQTTClient* c, const char* topicName, MQTTMessage* message)
int len = 0; int len = 0;
#if defined(MQTT_TASK) #if defined(MQTT_TASK)
MutexLock(&c->mutex); MutexLock(&c->mutex);
#endif #endif
if (!c->isconnected) if (!c->isconnected)
goto exit; goto exit;
TimerInit(&timer); TimerInit(&timer);
TimerCountdownMS(&timer, c->command_timeout_ms); TimerCountdownMS(&timer, c->command_timeout_ms);
@ -666,7 +666,7 @@ exit:
if (rc == FAILURE) if (rc == FAILURE)
MQTTCloseSession(c); MQTTCloseSession(c);
#if defined(MQTT_TASK) #if defined(MQTT_TASK)
MutexUnlock(&c->mutex); MutexUnlock(&c->mutex);
#endif #endif
return rc; return rc;
} }
@ -679,18 +679,18 @@ int MQTTDisconnect(MQTTClient* c)
int len = 0; int len = 0;
#if defined(MQTT_TASK) #if defined(MQTT_TASK)
MutexLock(&c->mutex); MutexLock(&c->mutex);
#endif #endif
TimerInit(&timer); TimerInit(&timer);
TimerCountdownMS(&timer, c->command_timeout_ms); TimerCountdownMS(&timer, c->command_timeout_ms);
len = MQTTSerialize_disconnect(c->buf, c->buf_size); len = MQTTSerialize_disconnect(c->buf, c->buf_size);
if (len > 0) if (len > 0)
rc = sendPacket(c, len, &timer); // send the disconnect packet rc = sendPacket(c, len, &timer); // send the disconnect packet
MQTTCloseSession(c); MQTTCloseSession(c);
#if defined(MQTT_TASK) #if defined(MQTT_TASK)
MutexUnlock(&c->mutex); MutexUnlock(&c->mutex);
#endif #endif
return rc; return rc;
} }

View File

@ -16,8 +16,8 @@
* Ian Craggs - add setMessageHandler function * Ian Craggs - add setMessageHandler function
*******************************************************************************/ *******************************************************************************/
#if !defined(__MQTT_CLIENT_C_) #if !defined(MQTT_CLIENT_H)
#define __MQTT_CLIENT_C_ #define MQTT_CLIENT_H
#if defined(__cplusplus) #if defined(__cplusplus)
extern "C" { extern "C" {
@ -35,8 +35,6 @@
#endif #endif
#include "MQTTPacket.h" #include "MQTTPacket.h"
#include "stdio.h"
#include "MQTTFreeRTOS.h"
#if defined(MQTTCLIENT_PLATFORM_HEADER) #if defined(MQTTCLIENT_PLATFORM_HEADER)
/* The following sequence of macros converts the MQTTCLIENT_PLATFORM_HEADER value /* The following sequence of macros converts the MQTTCLIENT_PLATFORM_HEADER value

View File

@ -410,3 +410,24 @@ exit:
return rc; return rc;
} }
const char* MQTTPacket_msgTypesToString(enum msgTypes msgType)
{
switch (msgType)
{
case CONNECT: return "CONNECT";
case CONNACK: return "CONNACK";
case PUBLISH: return "PUBLISH";
case PUBACK: return "PUBACK";
case PUBREC: return "PUBREC";
case PUBREL: return "PUBREL";
case PUBCOMP: return "PUBCOMP";
case SUBSCRIBE: return "SUBSCRIBE";
case SUBACK: return "SUBACK";
case UNSUBSCRIBE: return "UNSUBSCRIBE";
case UNSUBACK: return "UNSUBACK";
case PINGREQ: return "PINGREQ";
case PINGRESP: return "PINGRESP";
case DISCONNECT: return "DISCONNECT";
default: return NULL;
}
}

View File

@ -125,6 +125,8 @@ typedef struct {
int MQTTPacket_readnb(unsigned char* buf, int buflen, MQTTTransport *trp); int MQTTPacket_readnb(unsigned char* buf, int buflen, MQTTTransport *trp);
const char* MQTTPacket_msgTypesToString(enum msgTypes);
#ifdef __cplusplus /* If this is a C++ compiler, use C linkage */ #ifdef __cplusplus /* If this is a C++ compiler, use C linkage */
} }
#endif #endif