Update the MQTT keep-alive demo to use a mocked timer-query function (#369)

Because we would like to show an example of how the coreMQTT library is used without an actual timer query function, the keep-alive demo is updated to used a mocked function that always returns 0. As such, MQTT_ReceiveLoop must be passed a timeout of 0 so that it runs for exactly 1 iteration. Therefore, MQTT_ReceiveLoop is called repeatedly until an acknowledgement packet from the broker is received or the maximum iterations are reached.
This commit is contained in:
Oscar Michael Abrina
2020-10-30 18:16:36 -07:00
committed by GitHub
parent 58adeb2c0f
commit 5a8f03b633

View File

@ -34,7 +34,7 @@
* It shows how the MQTT API can be used without the keep-alive feature, * It shows how the MQTT API can be used without the keep-alive feature,
* so that the application can implements its own keep-alive functionality * so that the application can implements its own keep-alive functionality
* for MQTT. The example is single threaded and uses statically allocated memory; * for MQTT. The example is single threaded and uses statically allocated memory;
* it uses QOS0, and therefore it does not implement any retransmission * it uses QoS1, and therefore it does not implement any retransmission
* mechanism for publish messages. * mechanism for publish messages.
* *
* !!! NOTE !!! * !!! NOTE !!!
@ -131,11 +131,6 @@
*/ */
#define mqttexampleDELAY_BETWEEN_DEMO_ITERATIONS ( pdMS_TO_TICKS( 5000U ) ) #define mqttexampleDELAY_BETWEEN_DEMO_ITERATIONS ( pdMS_TO_TICKS( 5000U ) )
/**
* @brief Timeout for MQTT_ReceiveLoop in milliseconds.
*/
#define mqttexampleRECEIVE_LOOP_TIMEOUT_MS ( 500U )
/** /**
* @brief Keep alive time reported to the broker while establishing an MQTT connection. * @brief Keep alive time reported to the broker while establishing an MQTT connection.
* *
@ -144,23 +139,37 @@
* absence of sending any other control packets, the client MUST send a * absence of sending any other control packets, the client MUST send a
* PINGREQ Packet. * PINGREQ Packet.
*/ */
#define mqttexampleKEEP_ALIVE_TIMEOUT_SECONDS ( 60U ) #define mqttexampleKEEP_ALIVE_TIMEOUT_SECONDS ( 16U )
/** /**
* @brief Time to wait before sending ping request to keep MQTT connection alive. * @brief Time to wait before sending ping request to keep the MQTT connection alive.
* *
* A PINGREQ is attempted to be sent at every ( #mqttexampleKEEP_ALIVE_TIMEOUT_SECONDS / 4 ) * A PINGREQ is attempted to be sent at every ( #mqttexampleKEEP_ALIVE_TIMEOUT_SECONDS / 4 )
* seconds. This is to make sure that a PINGREQ is always sent before the timeout * seconds. This is to make sure that a PINGREQ is always sent before the timeout
* expires in broker. * expires in the broker.
*/ */
#define mqttexampleKEEP_ALIVE_DELAY ( pdMS_TO_TICKS( ( ( mqttexampleKEEP_ALIVE_TIMEOUT_SECONDS / 4 ) * 1000 ) ) ) #define mqttexamplePING_REQUEST_DELAY ( pdMS_TO_TICKS( ( ( mqttexampleKEEP_ALIVE_TIMEOUT_SECONDS / 4 ) * 1000 ) ) )
/** /**
* @brief Delay between MQTT publishes. Note that the receive loop also has a * @brief Time to wait before calling #MQTT_ReceiveLoop.
* timeout, so the total time between publishes is the sum of the two delays. The *
* keep-alive delay is added here so the keep-alive timer callback executes. * @note This delay is deliberately chosen so that the keep-alive timer callback
* is invoked if an expected control packet is not received within 2 iterations.
*/ */
#define mqttexampleDELAY_BETWEEN_PUBLISHES ( mqttexampleKEEP_ALIVE_DELAY + pdMS_TO_TICKS( 500U ) ) #define mqttexampleRECEIVE_LOOP_ITERATION_DELAY ( mqttexamplePING_REQUEST_DELAY / 2 )
/**
* @brief Time to wait before expecting ping response from the MQTT broker.
*
* This timeout provides some leeway so that the PINGRESP can be received within
* 4 iterations of #MQTT_ReceiveLoop since other control packets may be received first.
*/
#define mqttexamplePING_RESPONSE_DELAY ( mqttexampleRECEIVE_LOOP_ITERATION_DELAY * 4 )
/**
* @brief The number of iterations to call #MQTT_ReceiveLoop before failing.
*/
#define mqttexampleMAX_RECEIVE_LOOP_ITERATIONS ( 5U )
/** /**
* @brief Transport timeout in milliseconds for transport send and receive. * @brief Transport timeout in milliseconds for transport send and receive.
@ -176,7 +185,7 @@
/*-----------------------------------------------------------*/ /*-----------------------------------------------------------*/
#define MILLISECONDS_PER_SECOND ( 1000U ) /**< @brief Milliseconds per second. */ #define MILLISECONDS_PER_SECOND ( 1000U ) /**< @brief Milliseconds per second. */
#define MILLISECONDS_PER_TICK ( MILLISECONDS_PER_SECOND / configTICK_RATE_HZ ) /**< Milliseconds per FreeRTOS tick. */ #define MILLISECONDS_PER_TICK ( MILLISECONDS_PER_SECOND / configTICK_RATE_HZ ) /**< Milliseconds per FreeRTOS tick. */
/*-----------------------------------------------------------*/ /*-----------------------------------------------------------*/
@ -246,11 +255,13 @@ static void prvMQTTPublishToTopic( MQTTContext_t * pxMQTTContext );
static void prvMQTTUnsubscribeFromTopic( MQTTContext_t * pxMQTTContext ); static void prvMQTTUnsubscribeFromTopic( MQTTContext_t * pxMQTTContext );
/** /**
* @brief The timer query function provided to the MQTT context. * @brief The timer query function provided to the MQTT context that always
* returns 0. This provides an example of how to use the MQTT library without
* implementing an actual timer.
* *
* @return Time in milliseconds. * @return 0.
*/ */
static uint32_t prvGetTimeMs( void ); static uint32_t prvMockedGetTime( void );
/** /**
* @brief Process a response or ack to an MQTT request (PING, SUBSCRIBE * @brief Process a response or ack to an MQTT request (PING, SUBSCRIBE
@ -271,18 +282,6 @@ static void prvMQTTProcessResponse( MQTTPacketInfo_t * pxIncomingPacket,
*/ */
static void prvMQTTProcessIncomingPublish( MQTTPublishInfo_t * pxPublishInfo ); static void prvMQTTProcessIncomingPublish( MQTTPublishInfo_t * pxPublishInfo );
/**
* @brief Check if the amount of time waiting for PINGRESP has exceeded
* the specified timeout, then reset the keep-alive timer.
*
* This should only be called after a control packet has been sent.
*
* @param[in] pxTimer The auto-reload software timer for handling keep alive.
*
* @return The status returned by #xTimerReset.
*/
static BaseType_t prvCheckTimeoutThenResetTimer( TimerHandle_t pxTimer );
/** /**
* @brief This callback is invoked through an auto-reload software timer. * @brief This callback is invoked through an auto-reload software timer.
* *
@ -291,7 +290,18 @@ static BaseType_t prvCheckTimeoutThenResetTimer( TimerHandle_t pxTimer );
* *
* @param[in] pxTimer The auto-reload software timer for handling keep alive. * @param[in] pxTimer The auto-reload software timer for handling keep alive.
*/ */
static void prvKeepAliveTimerCallback( TimerHandle_t pxTimer ); static void prvPingReqTimerCallback( TimerHandle_t pxTimer );
/**
* @brief This callback is invoked through a software timer that is started by
* #prvPingReqTimerCallback.
*
* Its responsibility is to check that a PINGRESP has been received within
* the specified keep-alive timeout period.
*
* @param[in] pxTimer The auto-reload software timer for handling keep alive.
*/
static void prvPingRespTimerCallback( TimerHandle_t pxTimer );
/** /**
* @brief The application callback function for getting the incoming publish * @brief The application callback function for getting the incoming publish
@ -318,12 +328,10 @@ static uint8_t ucSharedBuffer[ mqttexampleSHARED_BUFFER_SIZE ];
static uint8_t ucPingReqBuffer[ MQTT_PACKET_PINGREQ_SIZE ]; static uint8_t ucPingReqBuffer[ MQTT_PACKET_PINGREQ_SIZE ];
/** /**
* @brief Global entry time into the application to use as a reference timestamp * @brief Packet Identifier generated when Publish request was sent to the broker;
* in the #prvGetTimeMs function. #prvGetTimeMs will always return the difference * it is used to match received Publish ACK to the transmitted Publish packet.
* between the current time and the global entry time. This will reduce the chances
* of overflow for the 32 bit unsigned integer used for holding the timestamp.
*/ */
static uint32_t ulGlobalEntryTimeMs; static uint16_t usPublishPacketIdentifier;
/** /**
* @brief Packet Identifier generated when Subscribe request was sent to the broker; * @brief Packet Identifier generated when Subscribe request was sent to the broker;
@ -357,14 +365,26 @@ static topicFilterContext_t xTopicFilterContext[ mqttexampleTOPIC_COUNT ] =
}; };
/** /**
* @brief Timer to handle the MQTT keep-alive mechanism. * @brief Auto-reload timer to send a PINGREQ packet every time
* #mqttexamplePING_REQUEST_DELAY ticks have passed.
*/ */
static TimerHandle_t xKeepAliveTimer; static TimerHandle_t xPingReqTimer;
/** /**
* @brief Storage space for xKeepAliveTimer. * @brief Storage space for xPingReqTimer.
*/ */
static StaticTimer_t xKeepAliveTimerBuffer; static StaticTimer_t xPingReqTimerBuffer;
/**
* @brief Auto-reload timer to check that a PINGRESP packet from the broker
* was received before the timeout period.
*/
static TimerHandle_t xPingRespTimer;
/**
* @brief Storage space for xPingRespTimer.
*/
static StaticTimer_t xPingRespTimerBuffer;
/** /**
* @brief Set to true when PINGREQ is sent then false when PINGRESP is received. * @brief Set to true when PINGREQ is sent then false when PINGRESP is received.
@ -372,14 +392,24 @@ static StaticTimer_t xKeepAliveTimerBuffer;
static volatile bool xWaitingForPingResp = false; static volatile bool xWaitingForPingResp = false;
/** /**
* @brief The last time when a PINGREQ was sent over the network. * @brief A flag indicating whether a PUBACK from the broker was received.
*/ */
static uint32_t ulPingReqSendTimeMs; static BaseType_t xReceivedPubAck = pdFALSE;
/** /**
* @brief Timeout for a pending PINGRESP from the MQTT broker. * @brief A flag indicating whether a SUBACK from the broker was received.
*/ */
static uint32_t ulPingRespTimeoutMs = ( mqttexampleKEEP_ALIVE_TIMEOUT_SECONDS / 4 ) * MILLISECONDS_PER_SECOND; static BaseType_t xReceivedSubAck = pdFALSE;
/**
* @brief A flag indicating whether an UNSUBACK from the broker was received.
*/
static BaseType_t xReceivedUnsubAck = pdFALSE;
/**
* @brief The number of times #MQTT_ReceiveLoop has been called in the loop.
*/
static uint32_t ulReceiveLoopIterations = 0;
/** /**
* @brief Static buffer used to hold an MQTT PINGREQ packet for keep-alive mechanism. * @brief Static buffer used to hold an MQTT PINGREQ packet for keep-alive mechanism.
@ -390,7 +420,7 @@ const static MQTTFixedBuffer_t xPingReqBuffer =
.size = MQTT_PACKET_PINGREQ_SIZE .size = MQTT_PACKET_PINGREQ_SIZE
}; };
/** /**
* @brief Static buffer used to hold MQTT messages being sent and received. * @brief Static buffer used to hold MQTT messages being sent and received.
*/ */
static MQTTFixedBuffer_t xBuffer = static MQTTFixedBuffer_t xBuffer =
@ -421,8 +451,7 @@ void vStartSimpleMQTTDemo( void )
static void prvMQTTDemoTask( void * pvParameters ) static void prvMQTTDemoTask( void * pvParameters )
{ {
uint32_t ulPublishCount = 0U, ulTopicCount = 0U; uint32_t ulTopicCount = 0U;
const uint32_t ulMaxPublishCount = 5UL;
NetworkContext_t xNetworkContext = { 0 }; NetworkContext_t xNetworkContext = { 0 };
MQTTContext_t xMQTTContext; MQTTContext_t xMQTTContext;
MQTTStatus_t xMQTTStatus; MQTTStatus_t xMQTTStatus;
@ -432,8 +461,6 @@ static void prvMQTTDemoTask( void * pvParameters )
/* Remove compiler warnings about unused parameters. */ /* Remove compiler warnings about unused parameters. */
( void ) pvParameters; ( void ) pvParameters;
ulGlobalEntryTimeMs = prvGetTimeMs();
/* Serialize a PINGREQ packet to send upon invoking the keep-alive timer /* Serialize a PINGREQ packet to send upon invoking the keep-alive timer
* callback. */ * callback. */
xMQTTStatus = MQTT_SerializePingreq( &xPingReqBuffer ); xMQTTStatus = MQTT_SerializePingreq( &xPingReqBuffer );
@ -457,17 +484,24 @@ static void prvMQTTDemoTask( void * pvParameters )
LogInfo( ( "Creating an MQTT connection to %s.", democonfigMQTT_BROKER_ENDPOINT ) ); LogInfo( ( "Creating an MQTT connection to %s.", democonfigMQTT_BROKER_ENDPOINT ) );
prvCreateMQTTConnectionWithBroker( &xMQTTContext, &xNetworkContext ); prvCreateMQTTConnectionWithBroker( &xMQTTContext, &xNetworkContext );
/* Create an auto-reload timer to handle keep-alive. */ /* Create timers to handle keep-alive. */
xKeepAliveTimer = xTimerCreateStatic( "KeepAliveTimer", xPingReqTimer = xTimerCreateStatic( "PingReqTimer",
mqttexampleKEEP_ALIVE_DELAY, mqttexamplePING_REQUEST_DELAY,
pdTRUE, pdTRUE,
( void * ) &xMQTTContext.transportInterface, ( void * ) &xMQTTContext.transportInterface,
prvKeepAliveTimerCallback, prvPingReqTimerCallback,
&xKeepAliveTimerBuffer ); &xPingReqTimerBuffer );
configASSERT( xKeepAliveTimer ); configASSERT( xPingReqTimer );
xPingRespTimer = xTimerCreateStatic( "PingRespTimer",
mqttexamplePING_RESPONSE_DELAY,
pdFALSE,
NULL,
prvPingRespTimerCallback,
&xPingRespTimerBuffer );
configASSERT( xPingRespTimer );
/* Start the timer for keep alive. */ /* Start the timer to send a PINGREQ. */
xTimerStatus = xTimerStart( xKeepAliveTimer, 0 ); xTimerStatus = xTimerStart( xPingReqTimer, 0 );
configASSERT( xTimerStatus == pdPASS ); configASSERT( xTimerStatus == pdPASS );
/**************************** Subscribe. ******************************/ /**************************** Subscribe. ******************************/
@ -477,32 +511,56 @@ static void prvMQTTDemoTask( void * pvParameters )
* strategy declared in retry_utils.h. */ * strategy declared in retry_utils.h. */
prvMQTTSubscribeWithBackoffRetries( &xMQTTContext ); prvMQTTSubscribeWithBackoffRetries( &xMQTTContext );
/************************ Send PINGREQ packet. ************************/
/* Deliberately delay in order for the auto-reload timer to send a
* PINGREQ to the broker. */
vTaskDelay( mqttexamplePING_REQUEST_DELAY );
/********************* Publish and Receive Loop. **********************/ /********************* Publish and Receive Loop. **********************/
/* Publish messages with QOS0, send and process keep-alive messages. */ /* Publish messages with QOS1, send and process keep-alive messages. */
for( ulPublishCount = 0; ulPublishCount < ulMaxPublishCount; ulPublishCount++ ) LogInfo( ( "Publish to the MQTT topic %s.", mqttexampleTOPIC ) );
prvMQTTPublishToTopic( &xMQTTContext );
/* Process the incoming publish echo. Since the application subscribed to
* the same topic, the broker will send the same publish message back
* to the application. */
LogInfo( ( "Attempt to receive publish message from broker." ) );
while( xReceivedPubAck == pdFALSE )
{ {
LogInfo( ( "Publish to the MQTT topic %s.", mqttexampleTOPIC ) ); ulReceiveLoopIterations += 1U;
prvMQTTPublishToTopic( &xMQTTContext ); configASSERT( ulReceiveLoopIterations <= mqttexampleMAX_RECEIVE_LOOP_ITERATIONS );
/* Process the incoming publish echo. Since the application subscribed to vTaskDelay( mqttexampleRECEIVE_LOOP_ITERATION_DELAY );
* the same topic, the broker will send the same publish message back
* to the application. */ xMQTTStatus = MQTT_ReceiveLoop( &xMQTTContext, 0U );
LogInfo( ( "Attempt to receive publish message from broker." ) );
xMQTTStatus = MQTT_ReceiveLoop( &xMQTTContext, mqttexampleRECEIVE_LOOP_TIMEOUT_MS );
configASSERT( xMQTTStatus == MQTTSuccess ); configASSERT( xMQTTStatus == MQTTSuccess );
/* Leave Connection Idle for some time. */
LogInfo( ( "Keeping Connection Idle..." ) );
vTaskDelay( mqttexampleDELAY_BETWEEN_PUBLISHES );
} }
/* Reset after loop. */
ulReceiveLoopIterations = 0U;
xReceivedPubAck = pdFALSE;
/******************** Unsubscribe from the topic. *********************/ /******************** Unsubscribe from the topic. *********************/
LogInfo( ( "Unsubscribe from the MQTT topic %s.", mqttexampleTOPIC ) ); LogInfo( ( "Unsubscribe from the MQTT topic %s.", mqttexampleTOPIC ) );
prvMQTTUnsubscribeFromTopic( &xMQTTContext ); prvMQTTUnsubscribeFromTopic( &xMQTTContext );
/* Process an incoming packet from the broker. */ /* Process an incoming packet from the broker. */
xMQTTStatus = MQTT_ReceiveLoop( &xMQTTContext, mqttexampleRECEIVE_LOOP_TIMEOUT_MS ); while( xReceivedUnsubAck == pdFALSE )
configASSERT( xMQTTStatus == MQTTSuccess ); {
ulReceiveLoopIterations += 1U;
configASSERT( ulReceiveLoopIterations <= mqttexampleMAX_RECEIVE_LOOP_ITERATIONS );
vTaskDelay( mqttexampleRECEIVE_LOOP_ITERATION_DELAY );
xMQTTStatus = MQTT_ReceiveLoop( &xMQTTContext, 0U );
configASSERT( xMQTTStatus == MQTTSuccess );
}
/* Reset after loop. */
ulReceiveLoopIterations = 0U;
xReceivedUnsubAck = pdFALSE;
/**************************** Disconnect. *****************************/ /**************************** Disconnect. *****************************/
@ -514,8 +572,10 @@ static void prvMQTTDemoTask( void * pvParameters )
xMQTTStatus = MQTT_Disconnect( &xMQTTContext ); xMQTTStatus = MQTT_Disconnect( &xMQTTContext );
configASSERT( xMQTTStatus == MQTTSuccess ); configASSERT( xMQTTStatus == MQTTSuccess );
/* Stop the keep-alive timer for the next iteration. */ /* Stop the keep-alive timers for the next iteration. */
xTimerStatus = xTimerStop( xKeepAliveTimer, 0 ); xTimerStatus = xTimerStop( xPingReqTimer, 0 );
configASSERT( xTimerStatus == pdPASS );
xTimerStatus = xTimerStop( xPingRespTimer, 0 );
configASSERT( xTimerStatus == pdPASS ); configASSERT( xTimerStatus == pdPASS );
/* Close the network connection. */ /* Close the network connection. */
@ -605,7 +665,7 @@ static void prvCreateMQTTConnectionWithBroker( MQTTContext_t * pxMQTTContext,
xTransport.recv = Plaintext_FreeRTOS_recv; xTransport.recv = Plaintext_FreeRTOS_recv;
/* Initialize MQTT library. */ /* Initialize MQTT library. */
xResult = MQTT_Init( pxMQTTContext, &xTransport, prvGetTimeMs, prvEventCallback, &xBuffer ); xResult = MQTT_Init( pxMQTTContext, &xTransport, prvMockedGetTime, prvEventCallback, &xBuffer );
configASSERT( xResult == MQTTSuccess ); configASSERT( xResult == MQTTSuccess );
/* Many fields not used in this demo so start with everything at 0. */ /* Many fields not used in this demo so start with everything at 0. */
@ -666,6 +726,7 @@ static void prvMQTTSubscribeWithBackoffRetries( MQTTContext_t * pxMQTTContext )
RetryUtilsStatus_t xRetryUtilsStatus = RetryUtilsSuccess; RetryUtilsStatus_t xRetryUtilsStatus = RetryUtilsSuccess;
RetryUtilsParams_t xRetryParams; RetryUtilsParams_t xRetryParams;
MQTTSubscribeInfo_t xMQTTSubscription[ mqttexampleTOPIC_COUNT ]; MQTTSubscribeInfo_t xMQTTSubscription[ mqttexampleTOPIC_COUNT ];
BaseType_t xTimerStatus;
bool xFailedSubscribeToTopic = false; bool xFailedSubscribeToTopic = false;
uint32_t ulTopicCount = 0U; uint32_t ulTopicCount = 0U;
@ -676,8 +737,8 @@ static void prvMQTTSubscribeWithBackoffRetries( MQTTContext_t * pxMQTTContext )
usSubscribePacketIdentifier = MQTT_GetPacketId( pxMQTTContext ); usSubscribePacketIdentifier = MQTT_GetPacketId( pxMQTTContext );
/* Subscribe to the mqttexampleTOPIC topic filter. This example subscribes to /* Subscribe to the mqttexampleTOPIC topic filter. This example subscribes to
* only one topic and uses QoS0. */ * only one topic and uses QoS1. */
xMQTTSubscription[ 0 ].qos = MQTTQoS0; xMQTTSubscription[ 0 ].qos = MQTTQoS1;
xMQTTSubscription[ 0 ].pTopicFilter = mqttexampleTOPIC; xMQTTSubscription[ 0 ].pTopicFilter = mqttexampleTOPIC;
xMQTTSubscription[ 0 ].topicFilterLength = ( uint16_t ) strlen( mqttexampleTOPIC ); xMQTTSubscription[ 0 ].topicFilterLength = ( uint16_t ) strlen( mqttexampleTOPIC );
@ -692,8 +753,8 @@ static void prvMQTTSubscribeWithBackoffRetries( MQTTContext_t * pxMQTTContext )
* subscribe packet then waiting for a subscribe acknowledgment (SUBACK). * subscribe packet then waiting for a subscribe acknowledgment (SUBACK).
* This client will then publish to the same topic it subscribed to, so it * This client will then publish to the same topic it subscribed to, so it
* will expect all the messages it sends to the broker to be sent back to it * will expect all the messages it sends to the broker to be sent back to it
* from the broker. This demo uses QOS0 in Subscribe. Therefore, the publish * from the broker. This demo uses QoS1 in Subscribe. Therefore, the publish
* messages received from the broker will have QOS0. */ * messages received from the broker will have QoS1. */
LogInfo( ( "Attempt to subscribe to the MQTT topic %s.", mqttexampleTOPIC ) ); LogInfo( ( "Attempt to subscribe to the MQTT topic %s.", mqttexampleTOPIC ) );
xResult = MQTT_Subscribe( pxMQTTContext, xResult = MQTT_Subscribe( pxMQTTContext,
xMQTTSubscription, xMQTTSubscription,
@ -703,6 +764,10 @@ static void prvMQTTSubscribeWithBackoffRetries( MQTTContext_t * pxMQTTContext )
LogInfo( ( "SUBSCRIBE sent for topic %s to broker.\n\n", mqttexampleTOPIC ) ); LogInfo( ( "SUBSCRIBE sent for topic %s to broker.\n\n", mqttexampleTOPIC ) );
/* When a SUBSCRIBE packet has been sent, the keep-alive timer can be reset. */
xTimerStatus = xTimerReset( xPingReqTimer, 0 );
configASSERT( xTimerStatus == pdPASS );
/* Process incoming packet from the broker. After sending the subscribe, the /* Process incoming packet from the broker. After sending the subscribe, the
* client may receive a publish before it receives a subscribe ack. Therefore, * client may receive a publish before it receives a subscribe ack. Therefore,
* call the generic incoming packet processing function. Since this demo is * call the generic incoming packet processing function. Since this demo is
@ -710,13 +775,25 @@ static void prvMQTTSubscribeWithBackoffRetries( MQTTContext_t * pxMQTTContext )
* receiving a publish message before a subscribe ack is zero; but the application * receiving a publish message before a subscribe ack is zero; but the application
* must be ready to receive any packet. This demo uses the generic packet * must be ready to receive any packet. This demo uses the generic packet
* processing function everywhere to highlight this fact. */ * processing function everywhere to highlight this fact. */
xResult = MQTT_ProcessLoop( pxMQTTContext, mqttexampleRECEIVE_LOOP_TIMEOUT_MS ); while( xReceivedSubAck == pdFALSE )
configASSERT( xResult == MQTTSuccess ); {
ulReceiveLoopIterations += 1U;
configASSERT( ulReceiveLoopIterations <= mqttexampleMAX_RECEIVE_LOOP_ITERATIONS );
vTaskDelay( mqttexampleRECEIVE_LOOP_ITERATION_DELAY );
xResult = MQTT_ReceiveLoop( pxMQTTContext, 0U );
configASSERT( xResult == MQTTSuccess );
}
/* Reset in case another attempt to subscribe is needed. */
ulReceiveLoopIterations = 0U;
xReceivedSubAck = pdFALSE;
/* Reset flag before checking suback responses. */ /* Reset flag before checking suback responses. */
xFailedSubscribeToTopic = false; xFailedSubscribeToTopic = false;
/* Check if the recent subscription request has been rejected. #xTopicFilterContext /* Check if the recent subscription request has been rejected. #xTopicFilterContext
* is updated in the event callback to reflect the status of the SUBACK * is updated in the event callback to reflect the status of the SUBACK
* sent by the broker. It represents either the QoS level granted by the * sent by the broker. It represents either the QoS level granted by the
* server upon subscription or acknowledgement of server rejection of the * server upon subscription or acknowledgement of server rejection of the
@ -752,20 +829,23 @@ static void prvMQTTPublishToTopic( MQTTContext_t * pxMQTTContext )
/* Some fields are not used by this demo so start with everything at 0. */ /* Some fields are not used by this demo so start with everything at 0. */
( void ) memset( ( void * ) &xMQTTPublishInfo, 0x00, sizeof( xMQTTPublishInfo ) ); ( void ) memset( ( void * ) &xMQTTPublishInfo, 0x00, sizeof( xMQTTPublishInfo ) );
/* This demo uses QoS0. */ /* This demo uses QoS1. */
xMQTTPublishInfo.qos = MQTTQoS0; xMQTTPublishInfo.qos = MQTTQoS1;
xMQTTPublishInfo.retain = false; xMQTTPublishInfo.retain = false;
xMQTTPublishInfo.pTopicName = mqttexampleTOPIC; xMQTTPublishInfo.pTopicName = mqttexampleTOPIC;
xMQTTPublishInfo.topicNameLength = ( uint16_t ) strlen( mqttexampleTOPIC ); xMQTTPublishInfo.topicNameLength = ( uint16_t ) strlen( mqttexampleTOPIC );
xMQTTPublishInfo.pPayload = mqttexampleMESSAGE; xMQTTPublishInfo.pPayload = mqttexampleMESSAGE;
xMQTTPublishInfo.payloadLength = strlen( mqttexampleMESSAGE ); xMQTTPublishInfo.payloadLength = strlen( mqttexampleMESSAGE );
/* Send a PUBLISH packet. Packet ID is not used for a QoS0 publish. */ /* Get a unique packet id. */
xResult = MQTT_Publish( pxMQTTContext, &xMQTTPublishInfo, 0U ); usPublishPacketIdentifier = MQTT_GetPacketId( pxMQTTContext );
/* Send a PUBLISH packet. */
xResult = MQTT_Publish( pxMQTTContext, &xMQTTPublishInfo, usPublishPacketIdentifier );
configASSERT( xResult == MQTTSuccess ); configASSERT( xResult == MQTTSuccess );
/* When a PUBLISH packet has been sent, the keep-alive timer can be reset. */ /* When a PUBLISH packet has been sent, the keep-alive timer can be reset. */
xTimerStatus = prvCheckTimeoutThenResetTimer( xKeepAliveTimer ); xTimerStatus = xTimerReset( xPingReqTimer, 0 );
configASSERT( xTimerStatus == pdPASS ); configASSERT( xTimerStatus == pdPASS );
} }
/*-----------------------------------------------------------*/ /*-----------------------------------------------------------*/
@ -774,6 +854,7 @@ static void prvMQTTUnsubscribeFromTopic( MQTTContext_t * pxMQTTContext )
{ {
MQTTStatus_t xResult; MQTTStatus_t xResult;
MQTTSubscribeInfo_t xMQTTSubscription[ mqttexampleTOPIC_COUNT ]; MQTTSubscribeInfo_t xMQTTSubscription[ mqttexampleTOPIC_COUNT ];
BaseType_t xTimerStatus;
/* Some fields are not used by this demo, so start with everything at 0. */ /* Some fields are not used by this demo, so start with everything at 0. */
( void ) memset( ( void * ) &xMQTTSubscription, 0x00, sizeof( xMQTTSubscription ) ); ( void ) memset( ( void * ) &xMQTTSubscription, 0x00, sizeof( xMQTTSubscription ) );
@ -782,8 +863,8 @@ static void prvMQTTUnsubscribeFromTopic( MQTTContext_t * pxMQTTContext )
usSubscribePacketIdentifier = MQTT_GetPacketId( pxMQTTContext ); usSubscribePacketIdentifier = MQTT_GetPacketId( pxMQTTContext );
/* Subscribe to the mqttexampleTOPIC topic filter. This example subscribes to /* Subscribe to the mqttexampleTOPIC topic filter. This example subscribes to
* only one topic and uses QoS0. */ * only one topic and uses QoS1. */
xMQTTSubscription[ 0 ].qos = MQTTQoS0; xMQTTSubscription[ 0 ].qos = MQTTQoS1;
xMQTTSubscription[ 0 ].pTopicFilter = mqttexampleTOPIC; xMQTTSubscription[ 0 ].pTopicFilter = mqttexampleTOPIC;
xMQTTSubscription[ 0 ].topicFilterLength = ( uint16_t ) strlen( mqttexampleTOPIC ); xMQTTSubscription[ 0 ].topicFilterLength = ( uint16_t ) strlen( mqttexampleTOPIC );
@ -795,8 +876,11 @@ static void prvMQTTUnsubscribeFromTopic( MQTTContext_t * pxMQTTContext )
xMQTTSubscription, xMQTTSubscription,
sizeof( xMQTTSubscription ) / sizeof( MQTTSubscribeInfo_t ), sizeof( xMQTTSubscription ) / sizeof( MQTTSubscribeInfo_t ),
usUnsubscribePacketIdentifier ); usUnsubscribePacketIdentifier );
configASSERT( xResult == MQTTSuccess ); configASSERT( xResult == MQTTSuccess );
/* When an UNSUBSCRIBE packet has been sent, the keep-alive timer can be reset. */
xTimerStatus = xTimerReset( xPingReqTimer, 0 );
configASSERT( xTimerStatus == pdPASS );
} }
/*-----------------------------------------------------------*/ /*-----------------------------------------------------------*/
@ -807,7 +891,15 @@ static void prvMQTTProcessResponse( MQTTPacketInfo_t * pxIncomingPacket,
switch( pxIncomingPacket->type ) switch( pxIncomingPacket->type )
{ {
case MQTT_PACKET_TYPE_PUBACK:
xReceivedPubAck = pdTRUE;
LogInfo( ( "PUBACK received for packet Id %u.\r\n", usPacketId ) );
/* Make sure ACK packet identifier matches with Request packet identifier. */
configASSERT( usPublishPacketIdentifier == usPacketId );
break;
case MQTT_PACKET_TYPE_SUBACK: case MQTT_PACKET_TYPE_SUBACK:
xReceivedSubAck = pdTRUE;
/* A SUBACK from the broker, containing the server response to our subscription request, has been received. /* A SUBACK from the broker, containing the server response to our subscription request, has been received.
* It contains the status code indicating server approval/rejection for the subscription to the single topic * It contains the status code indicating server approval/rejection for the subscription to the single topic
@ -830,6 +922,7 @@ static void prvMQTTProcessResponse( MQTTPacketInfo_t * pxIncomingPacket,
break; break;
case MQTT_PACKET_TYPE_UNSUBACK: case MQTT_PACKET_TYPE_UNSUBACK:
xReceivedUnsubAck = pdTRUE;
LogInfo( ( "Unsubscribed from the topic %s.", mqttexampleTOPIC ) ); LogInfo( ( "Unsubscribed from the topic %s.", mqttexampleTOPIC ) );
/* Make sure ACK packet identifier matches with Request packet identifier. */ /* Make sure ACK packet identifier matches with Request packet identifier. */
configASSERT( usUnsubscribePacketIdentifier == usPacketId ); configASSERT( usUnsubscribePacketIdentifier == usPacketId );
@ -869,7 +962,7 @@ static void prvMQTTProcessIncomingPublish( MQTTPublishInfo_t * pxPublishInfo )
} }
else else
{ {
LogInfo( ( "Incoming Publish Topic Name: %.*s does not match subscribed topic.", LogInfo( ( "Incoming Publish Topic Name: %.*s does not match subscribed topic.\r\n",
pxPublishInfo->topicNameLength, pxPublishInfo->topicNameLength,
pxPublishInfo->pTopicName ) ); pxPublishInfo->pTopicName ) );
} }
@ -877,52 +970,43 @@ static void prvMQTTProcessIncomingPublish( MQTTPublishInfo_t * pxPublishInfo )
/*-----------------------------------------------------------*/ /*-----------------------------------------------------------*/
static BaseType_t prvCheckTimeoutThenResetTimer( TimerHandle_t pxTimer ) static void prvPingReqTimerCallback( TimerHandle_t pxTimer )
{
uint32_t now = 0U;
if( xWaitingForPingResp == true )
{
now = prvGetTimeMs();
/* Assert that the PINGRESP timeout has not expired. */
configASSERT( ( now - ulPingReqSendTimeMs ) <= ulPingRespTimeoutMs );
}
return xTimerReset( pxTimer, 0 );
}
/*-----------------------------------------------------------*/
static void prvKeepAliveTimerCallback( TimerHandle_t pxTimer )
{ {
TransportInterface_t * pxTransport; TransportInterface_t * pxTransport;
int32_t xTransportStatus; int32_t xTransportStatus;
uint32_t now = 0U; BaseType_t xTimerStatus;
pxTransport = ( TransportInterface_t * ) pvTimerGetTimerID( pxTimer ); pxTransport = ( TransportInterface_t * ) pvTimerGetTimerID( pxTimer );
if( xWaitingForPingResp == true ) /* Do not resend if waiting on a PINGRESP. */
if( xWaitingForPingResp == false )
{ {
now = prvGetTimeMs(); /* Send PINGREQ to broker */
/* Assert that the PINGRESP timeout has not expired. */ LogInfo( ( "Ping the MQTT broker." ) );
configASSERT( ( now - ulPingReqSendTimeMs ) <= ulPingRespTimeoutMs );
}
else
{
/* Send Ping Request to the broker. */
LogInfo( ( "Attempt to ping the MQTT broker." ) );
xTransportStatus = pxTransport->send( pxTransport->pNetworkContext, xTransportStatus = pxTransport->send( pxTransport->pNetworkContext,
( void * ) xPingReqBuffer.pBuffer, ( void * ) xPingReqBuffer.pBuffer,
xPingReqBuffer.size ); xPingReqBuffer.size );
configASSERT( ( size_t ) xTransportStatus == xPingReqBuffer.size ); configASSERT( ( size_t ) xTransportStatus == xPingReqBuffer.size );
ulPingReqSendTimeMs = prvGetTimeMs();
xWaitingForPingResp = true; xWaitingForPingResp = true;
/* Start the timer to expect a PINGRESP. */
xTimerStatus = xTimerStart( xPingRespTimer, 0 );
configASSERT( xTimerStatus == pdPASS );
} }
} }
/*-----------------------------------------------------------*/ /*-----------------------------------------------------------*/
static void prvPingRespTimerCallback( TimerHandle_t pxTimer )
{
( void ) pxTimer;
/* Assert that a pending PINGRESP has been received. */
configASSERT( xWaitingForPingResp == false );
}
/*-----------------------------------------------------------*/
static void prvEventCallback( MQTTContext_t * pxMQTTContext, static void prvEventCallback( MQTTContext_t * pxMQTTContext,
MQTTPacketInfo_t * pxPacketInfo, MQTTPacketInfo_t * pxPacketInfo,
MQTTDeserializedInfo_t * pxDeserializedInfo ) MQTTDeserializedInfo_t * pxDeserializedInfo )
@ -942,22 +1026,9 @@ static void prvEventCallback( MQTTContext_t * pxMQTTContext,
/*-----------------------------------------------------------*/ /*-----------------------------------------------------------*/
static uint32_t prvGetTimeMs( void ) static uint32_t prvMockedGetTime( void )
{ {
TickType_t xTickCount = 0; return 0;
uint32_t ulTimeMs = 0UL;
/* Get the current tick count. */
xTickCount = xTaskGetTickCount();
/* Convert the ticks to milliseconds. */
ulTimeMs = ( uint32_t ) xTickCount * MILLISECONDS_PER_TICK;
/* Reduce ulGlobalEntryTimeMs from obtained time so as to always return the
* elapsed time in the application. */
ulTimeMs = ( uint32_t ) ( ulTimeMs - ulGlobalEntryTimeMs );
return ulTimeMs;
} }
/*-----------------------------------------------------------*/ /*-----------------------------------------------------------*/