From 5a8f03b63335df7a08d7f9c01db0b579a37ec75c Mon Sep 17 00:00:00 2001 From: Oscar Michael Abrina Date: Fri, 30 Oct 2020 18:16:36 -0700 Subject: [PATCH] Update the MQTT keep-alive demo to use a mocked timer-query function (#369) Because we would like to show an example of how the coreMQTT library is used without an actual timer query function, the keep-alive demo is updated to used a mocked function that always returns 0. As such, MQTT_ReceiveLoop must be passed a timeout of 0 so that it runs for exactly 1 iteration. Therefore, MQTT_ReceiveLoop is called repeatedly until an acknowledgement packet from the broker is received or the maximum iterations are reached. --- .../DemoTasks/KeepAliveMQTTExample.c | 343 +++++++++++------- 1 file changed, 207 insertions(+), 136 deletions(-) diff --git a/FreeRTOS-Plus/Demo/coreMQTT_Windows_Simulator/MQTT_Keep_Alive/DemoTasks/KeepAliveMQTTExample.c b/FreeRTOS-Plus/Demo/coreMQTT_Windows_Simulator/MQTT_Keep_Alive/DemoTasks/KeepAliveMQTTExample.c index f9ac81015c..87359a41ed 100644 --- a/FreeRTOS-Plus/Demo/coreMQTT_Windows_Simulator/MQTT_Keep_Alive/DemoTasks/KeepAliveMQTTExample.c +++ b/FreeRTOS-Plus/Demo/coreMQTT_Windows_Simulator/MQTT_Keep_Alive/DemoTasks/KeepAliveMQTTExample.c @@ -34,7 +34,7 @@ * It shows how the MQTT API can be used without the keep-alive feature, * so that the application can implements its own keep-alive functionality * for MQTT. The example is single threaded and uses statically allocated memory; - * it uses QOS0, and therefore it does not implement any retransmission + * it uses QoS1, and therefore it does not implement any retransmission * mechanism for publish messages. * * !!! NOTE !!! @@ -131,11 +131,6 @@ */ #define mqttexampleDELAY_BETWEEN_DEMO_ITERATIONS ( pdMS_TO_TICKS( 5000U ) ) -/** - * @brief Timeout for MQTT_ReceiveLoop in milliseconds. - */ -#define mqttexampleRECEIVE_LOOP_TIMEOUT_MS ( 500U ) - /** * @brief Keep alive time reported to the broker while establishing an MQTT connection. * @@ -144,23 +139,37 @@ * absence of sending any other control packets, the client MUST send a * PINGREQ Packet. */ -#define mqttexampleKEEP_ALIVE_TIMEOUT_SECONDS ( 60U ) +#define mqttexampleKEEP_ALIVE_TIMEOUT_SECONDS ( 16U ) /** - * @brief Time to wait before sending ping request to keep MQTT connection alive. + * @brief Time to wait before sending ping request to keep the MQTT connection alive. * * A PINGREQ is attempted to be sent at every ( #mqttexampleKEEP_ALIVE_TIMEOUT_SECONDS / 4 ) * seconds. This is to make sure that a PINGREQ is always sent before the timeout - * expires in broker. + * expires in the broker. */ -#define mqttexampleKEEP_ALIVE_DELAY ( pdMS_TO_TICKS( ( ( mqttexampleKEEP_ALIVE_TIMEOUT_SECONDS / 4 ) * 1000 ) ) ) +#define mqttexamplePING_REQUEST_DELAY ( pdMS_TO_TICKS( ( ( mqttexampleKEEP_ALIVE_TIMEOUT_SECONDS / 4 ) * 1000 ) ) ) /** - * @brief Delay between MQTT publishes. Note that the receive loop also has a - * timeout, so the total time between publishes is the sum of the two delays. The - * keep-alive delay is added here so the keep-alive timer callback executes. + * @brief Time to wait before calling #MQTT_ReceiveLoop. + * + * @note This delay is deliberately chosen so that the keep-alive timer callback + * is invoked if an expected control packet is not received within 2 iterations. */ -#define mqttexampleDELAY_BETWEEN_PUBLISHES ( mqttexampleKEEP_ALIVE_DELAY + pdMS_TO_TICKS( 500U ) ) +#define mqttexampleRECEIVE_LOOP_ITERATION_DELAY ( mqttexamplePING_REQUEST_DELAY / 2 ) + +/** + * @brief Time to wait before expecting ping response from the MQTT broker. + * + * This timeout provides some leeway so that the PINGRESP can be received within + * 4 iterations of #MQTT_ReceiveLoop since other control packets may be received first. + */ +#define mqttexamplePING_RESPONSE_DELAY ( mqttexampleRECEIVE_LOOP_ITERATION_DELAY * 4 ) + +/** + * @brief The number of iterations to call #MQTT_ReceiveLoop before failing. + */ +#define mqttexampleMAX_RECEIVE_LOOP_ITERATIONS ( 5U ) /** * @brief Transport timeout in milliseconds for transport send and receive. @@ -176,7 +185,7 @@ /*-----------------------------------------------------------*/ -#define MILLISECONDS_PER_SECOND ( 1000U ) /**< @brief Milliseconds per second. */ +#define MILLISECONDS_PER_SECOND ( 1000U ) /**< @brief Milliseconds per second. */ #define MILLISECONDS_PER_TICK ( MILLISECONDS_PER_SECOND / configTICK_RATE_HZ ) /**< Milliseconds per FreeRTOS tick. */ /*-----------------------------------------------------------*/ @@ -246,11 +255,13 @@ static void prvMQTTPublishToTopic( MQTTContext_t * pxMQTTContext ); static void prvMQTTUnsubscribeFromTopic( MQTTContext_t * pxMQTTContext ); /** - * @brief The timer query function provided to the MQTT context. + * @brief The timer query function provided to the MQTT context that always + * returns 0. This provides an example of how to use the MQTT library without + * implementing an actual timer. * - * @return Time in milliseconds. + * @return 0. */ -static uint32_t prvGetTimeMs( void ); +static uint32_t prvMockedGetTime( void ); /** * @brief Process a response or ack to an MQTT request (PING, SUBSCRIBE @@ -271,18 +282,6 @@ static void prvMQTTProcessResponse( MQTTPacketInfo_t * pxIncomingPacket, */ static void prvMQTTProcessIncomingPublish( MQTTPublishInfo_t * pxPublishInfo ); -/** - * @brief Check if the amount of time waiting for PINGRESP has exceeded - * the specified timeout, then reset the keep-alive timer. - * - * This should only be called after a control packet has been sent. - * - * @param[in] pxTimer The auto-reload software timer for handling keep alive. - * - * @return The status returned by #xTimerReset. - */ -static BaseType_t prvCheckTimeoutThenResetTimer( TimerHandle_t pxTimer ); - /** * @brief This callback is invoked through an auto-reload software timer. * @@ -291,7 +290,18 @@ static BaseType_t prvCheckTimeoutThenResetTimer( TimerHandle_t pxTimer ); * * @param[in] pxTimer The auto-reload software timer for handling keep alive. */ -static void prvKeepAliveTimerCallback( TimerHandle_t pxTimer ); +static void prvPingReqTimerCallback( TimerHandle_t pxTimer ); + +/** + * @brief This callback is invoked through a software timer that is started by + * #prvPingReqTimerCallback. + * + * Its responsibility is to check that a PINGRESP has been received within + * the specified keep-alive timeout period. + * + * @param[in] pxTimer The auto-reload software timer for handling keep alive. + */ +static void prvPingRespTimerCallback( TimerHandle_t pxTimer ); /** * @brief The application callback function for getting the incoming publish @@ -318,12 +328,10 @@ static uint8_t ucSharedBuffer[ mqttexampleSHARED_BUFFER_SIZE ]; static uint8_t ucPingReqBuffer[ MQTT_PACKET_PINGREQ_SIZE ]; /** - * @brief Global entry time into the application to use as a reference timestamp - * in the #prvGetTimeMs function. #prvGetTimeMs will always return the difference - * between the current time and the global entry time. This will reduce the chances - * of overflow for the 32 bit unsigned integer used for holding the timestamp. + * @brief Packet Identifier generated when Publish request was sent to the broker; + * it is used to match received Publish ACK to the transmitted Publish packet. */ -static uint32_t ulGlobalEntryTimeMs; +static uint16_t usPublishPacketIdentifier; /** * @brief Packet Identifier generated when Subscribe request was sent to the broker; @@ -357,14 +365,26 @@ static topicFilterContext_t xTopicFilterContext[ mqttexampleTOPIC_COUNT ] = }; /** - * @brief Timer to handle the MQTT keep-alive mechanism. + * @brief Auto-reload timer to send a PINGREQ packet every time + * #mqttexamplePING_REQUEST_DELAY ticks have passed. */ -static TimerHandle_t xKeepAliveTimer; +static TimerHandle_t xPingReqTimer; /** - * @brief Storage space for xKeepAliveTimer. + * @brief Storage space for xPingReqTimer. */ -static StaticTimer_t xKeepAliveTimerBuffer; +static StaticTimer_t xPingReqTimerBuffer; + +/** + * @brief Auto-reload timer to check that a PINGRESP packet from the broker + * was received before the timeout period. + */ +static TimerHandle_t xPingRespTimer; + +/** + * @brief Storage space for xPingRespTimer. + */ +static StaticTimer_t xPingRespTimerBuffer; /** * @brief Set to true when PINGREQ is sent then false when PINGRESP is received. @@ -372,14 +392,24 @@ static StaticTimer_t xKeepAliveTimerBuffer; static volatile bool xWaitingForPingResp = false; /** - * @brief The last time when a PINGREQ was sent over the network. + * @brief A flag indicating whether a PUBACK from the broker was received. */ -static uint32_t ulPingReqSendTimeMs; +static BaseType_t xReceivedPubAck = pdFALSE; /** - * @brief Timeout for a pending PINGRESP from the MQTT broker. + * @brief A flag indicating whether a SUBACK from the broker was received. */ -static uint32_t ulPingRespTimeoutMs = ( mqttexampleKEEP_ALIVE_TIMEOUT_SECONDS / 4 ) * MILLISECONDS_PER_SECOND; +static BaseType_t xReceivedSubAck = pdFALSE; + +/** + * @brief A flag indicating whether an UNSUBACK from the broker was received. + */ +static BaseType_t xReceivedUnsubAck = pdFALSE; + +/** + * @brief The number of times #MQTT_ReceiveLoop has been called in the loop. + */ +static uint32_t ulReceiveLoopIterations = 0; /** * @brief Static buffer used to hold an MQTT PINGREQ packet for keep-alive mechanism. @@ -390,7 +420,7 @@ const static MQTTFixedBuffer_t xPingReqBuffer = .size = MQTT_PACKET_PINGREQ_SIZE }; -/** +/** * @brief Static buffer used to hold MQTT messages being sent and received. */ static MQTTFixedBuffer_t xBuffer = @@ -421,8 +451,7 @@ void vStartSimpleMQTTDemo( void ) static void prvMQTTDemoTask( void * pvParameters ) { - uint32_t ulPublishCount = 0U, ulTopicCount = 0U; - const uint32_t ulMaxPublishCount = 5UL; + uint32_t ulTopicCount = 0U; NetworkContext_t xNetworkContext = { 0 }; MQTTContext_t xMQTTContext; MQTTStatus_t xMQTTStatus; @@ -432,8 +461,6 @@ static void prvMQTTDemoTask( void * pvParameters ) /* Remove compiler warnings about unused parameters. */ ( void ) pvParameters; - ulGlobalEntryTimeMs = prvGetTimeMs(); - /* Serialize a PINGREQ packet to send upon invoking the keep-alive timer * callback. */ xMQTTStatus = MQTT_SerializePingreq( &xPingReqBuffer ); @@ -457,17 +484,24 @@ static void prvMQTTDemoTask( void * pvParameters ) LogInfo( ( "Creating an MQTT connection to %s.", democonfigMQTT_BROKER_ENDPOINT ) ); prvCreateMQTTConnectionWithBroker( &xMQTTContext, &xNetworkContext ); - /* Create an auto-reload timer to handle keep-alive. */ - xKeepAliveTimer = xTimerCreateStatic( "KeepAliveTimer", - mqttexampleKEEP_ALIVE_DELAY, - pdTRUE, - ( void * ) &xMQTTContext.transportInterface, - prvKeepAliveTimerCallback, - &xKeepAliveTimerBuffer ); - configASSERT( xKeepAliveTimer ); + /* Create timers to handle keep-alive. */ + xPingReqTimer = xTimerCreateStatic( "PingReqTimer", + mqttexamplePING_REQUEST_DELAY, + pdTRUE, + ( void * ) &xMQTTContext.transportInterface, + prvPingReqTimerCallback, + &xPingReqTimerBuffer ); + configASSERT( xPingReqTimer ); + xPingRespTimer = xTimerCreateStatic( "PingRespTimer", + mqttexamplePING_RESPONSE_DELAY, + pdFALSE, + NULL, + prvPingRespTimerCallback, + &xPingRespTimerBuffer ); + configASSERT( xPingRespTimer ); - /* Start the timer for keep alive. */ - xTimerStatus = xTimerStart( xKeepAliveTimer, 0 ); + /* Start the timer to send a PINGREQ. */ + xTimerStatus = xTimerStart( xPingReqTimer, 0 ); configASSERT( xTimerStatus == pdPASS ); /**************************** Subscribe. ******************************/ @@ -477,32 +511,56 @@ static void prvMQTTDemoTask( void * pvParameters ) * strategy declared in retry_utils.h. */ prvMQTTSubscribeWithBackoffRetries( &xMQTTContext ); + /************************ Send PINGREQ packet. ************************/ + + /* Deliberately delay in order for the auto-reload timer to send a + * PINGREQ to the broker. */ + vTaskDelay( mqttexamplePING_REQUEST_DELAY ); + /********************* Publish and Receive Loop. **********************/ - /* Publish messages with QOS0, send and process keep-alive messages. */ - for( ulPublishCount = 0; ulPublishCount < ulMaxPublishCount; ulPublishCount++ ) + /* Publish messages with QOS1, send and process keep-alive messages. */ + LogInfo( ( "Publish to the MQTT topic %s.", mqttexampleTOPIC ) ); + prvMQTTPublishToTopic( &xMQTTContext ); + + /* Process the incoming publish echo. Since the application subscribed to + * the same topic, the broker will send the same publish message back + * to the application. */ + LogInfo( ( "Attempt to receive publish message from broker." ) ); + + while( xReceivedPubAck == pdFALSE ) { - LogInfo( ( "Publish to the MQTT topic %s.", mqttexampleTOPIC ) ); - prvMQTTPublishToTopic( &xMQTTContext ); + ulReceiveLoopIterations += 1U; + configASSERT( ulReceiveLoopIterations <= mqttexampleMAX_RECEIVE_LOOP_ITERATIONS ); - /* Process the incoming publish echo. Since the application subscribed to - * the same topic, the broker will send the same publish message back - * to the application. */ - LogInfo( ( "Attempt to receive publish message from broker." ) ); - xMQTTStatus = MQTT_ReceiveLoop( &xMQTTContext, mqttexampleRECEIVE_LOOP_TIMEOUT_MS ); + vTaskDelay( mqttexampleRECEIVE_LOOP_ITERATION_DELAY ); + + xMQTTStatus = MQTT_ReceiveLoop( &xMQTTContext, 0U ); configASSERT( xMQTTStatus == MQTTSuccess ); - - /* Leave Connection Idle for some time. */ - LogInfo( ( "Keeping Connection Idle..." ) ); - vTaskDelay( mqttexampleDELAY_BETWEEN_PUBLISHES ); } + /* Reset after loop. */ + ulReceiveLoopIterations = 0U; + xReceivedPubAck = pdFALSE; + /******************** Unsubscribe from the topic. *********************/ LogInfo( ( "Unsubscribe from the MQTT topic %s.", mqttexampleTOPIC ) ); prvMQTTUnsubscribeFromTopic( &xMQTTContext ); /* Process an incoming packet from the broker. */ - xMQTTStatus = MQTT_ReceiveLoop( &xMQTTContext, mqttexampleRECEIVE_LOOP_TIMEOUT_MS ); - configASSERT( xMQTTStatus == MQTTSuccess ); + while( xReceivedUnsubAck == pdFALSE ) + { + ulReceiveLoopIterations += 1U; + configASSERT( ulReceiveLoopIterations <= mqttexampleMAX_RECEIVE_LOOP_ITERATIONS ); + + vTaskDelay( mqttexampleRECEIVE_LOOP_ITERATION_DELAY ); + + xMQTTStatus = MQTT_ReceiveLoop( &xMQTTContext, 0U ); + configASSERT( xMQTTStatus == MQTTSuccess ); + } + + /* Reset after loop. */ + ulReceiveLoopIterations = 0U; + xReceivedUnsubAck = pdFALSE; /**************************** Disconnect. *****************************/ @@ -514,8 +572,10 @@ static void prvMQTTDemoTask( void * pvParameters ) xMQTTStatus = MQTT_Disconnect( &xMQTTContext ); configASSERT( xMQTTStatus == MQTTSuccess ); - /* Stop the keep-alive timer for the next iteration. */ - xTimerStatus = xTimerStop( xKeepAliveTimer, 0 ); + /* Stop the keep-alive timers for the next iteration. */ + xTimerStatus = xTimerStop( xPingReqTimer, 0 ); + configASSERT( xTimerStatus == pdPASS ); + xTimerStatus = xTimerStop( xPingRespTimer, 0 ); configASSERT( xTimerStatus == pdPASS ); /* Close the network connection. */ @@ -605,7 +665,7 @@ static void prvCreateMQTTConnectionWithBroker( MQTTContext_t * pxMQTTContext, xTransport.recv = Plaintext_FreeRTOS_recv; /* Initialize MQTT library. */ - xResult = MQTT_Init( pxMQTTContext, &xTransport, prvGetTimeMs, prvEventCallback, &xBuffer ); + xResult = MQTT_Init( pxMQTTContext, &xTransport, prvMockedGetTime, prvEventCallback, &xBuffer ); configASSERT( xResult == MQTTSuccess ); /* Many fields not used in this demo so start with everything at 0. */ @@ -666,6 +726,7 @@ static void prvMQTTSubscribeWithBackoffRetries( MQTTContext_t * pxMQTTContext ) RetryUtilsStatus_t xRetryUtilsStatus = RetryUtilsSuccess; RetryUtilsParams_t xRetryParams; MQTTSubscribeInfo_t xMQTTSubscription[ mqttexampleTOPIC_COUNT ]; + BaseType_t xTimerStatus; bool xFailedSubscribeToTopic = false; uint32_t ulTopicCount = 0U; @@ -676,8 +737,8 @@ static void prvMQTTSubscribeWithBackoffRetries( MQTTContext_t * pxMQTTContext ) usSubscribePacketIdentifier = MQTT_GetPacketId( pxMQTTContext ); /* Subscribe to the mqttexampleTOPIC topic filter. This example subscribes to - * only one topic and uses QoS0. */ - xMQTTSubscription[ 0 ].qos = MQTTQoS0; + * only one topic and uses QoS1. */ + xMQTTSubscription[ 0 ].qos = MQTTQoS1; xMQTTSubscription[ 0 ].pTopicFilter = mqttexampleTOPIC; xMQTTSubscription[ 0 ].topicFilterLength = ( uint16_t ) strlen( mqttexampleTOPIC ); @@ -692,8 +753,8 @@ static void prvMQTTSubscribeWithBackoffRetries( MQTTContext_t * pxMQTTContext ) * subscribe packet then waiting for a subscribe acknowledgment (SUBACK). * This client will then publish to the same topic it subscribed to, so it * will expect all the messages it sends to the broker to be sent back to it - * from the broker. This demo uses QOS0 in Subscribe. Therefore, the publish - * messages received from the broker will have QOS0. */ + * from the broker. This demo uses QoS1 in Subscribe. Therefore, the publish + * messages received from the broker will have QoS1. */ LogInfo( ( "Attempt to subscribe to the MQTT topic %s.", mqttexampleTOPIC ) ); xResult = MQTT_Subscribe( pxMQTTContext, xMQTTSubscription, @@ -703,6 +764,10 @@ static void prvMQTTSubscribeWithBackoffRetries( MQTTContext_t * pxMQTTContext ) LogInfo( ( "SUBSCRIBE sent for topic %s to broker.\n\n", mqttexampleTOPIC ) ); + /* When a SUBSCRIBE packet has been sent, the keep-alive timer can be reset. */ + xTimerStatus = xTimerReset( xPingReqTimer, 0 ); + configASSERT( xTimerStatus == pdPASS ); + /* Process incoming packet from the broker. After sending the subscribe, the * client may receive a publish before it receives a subscribe ack. Therefore, * call the generic incoming packet processing function. Since this demo is @@ -710,13 +775,25 @@ static void prvMQTTSubscribeWithBackoffRetries( MQTTContext_t * pxMQTTContext ) * receiving a publish message before a subscribe ack is zero; but the application * must be ready to receive any packet. This demo uses the generic packet * processing function everywhere to highlight this fact. */ - xResult = MQTT_ProcessLoop( pxMQTTContext, mqttexampleRECEIVE_LOOP_TIMEOUT_MS ); - configASSERT( xResult == MQTTSuccess ); + while( xReceivedSubAck == pdFALSE ) + { + ulReceiveLoopIterations += 1U; + configASSERT( ulReceiveLoopIterations <= mqttexampleMAX_RECEIVE_LOOP_ITERATIONS ); + + vTaskDelay( mqttexampleRECEIVE_LOOP_ITERATION_DELAY ); + + xResult = MQTT_ReceiveLoop( pxMQTTContext, 0U ); + configASSERT( xResult == MQTTSuccess ); + } + + /* Reset in case another attempt to subscribe is needed. */ + ulReceiveLoopIterations = 0U; + xReceivedSubAck = pdFALSE; /* Reset flag before checking suback responses. */ xFailedSubscribeToTopic = false; - /* Check if the recent subscription request has been rejected. #xTopicFilterContext + /* Check if the recent subscription request has been rejected. #xTopicFilterContext * is updated in the event callback to reflect the status of the SUBACK * sent by the broker. It represents either the QoS level granted by the * server upon subscription or acknowledgement of server rejection of the @@ -752,20 +829,23 @@ static void prvMQTTPublishToTopic( MQTTContext_t * pxMQTTContext ) /* Some fields are not used by this demo so start with everything at 0. */ ( void ) memset( ( void * ) &xMQTTPublishInfo, 0x00, sizeof( xMQTTPublishInfo ) ); - /* This demo uses QoS0. */ - xMQTTPublishInfo.qos = MQTTQoS0; + /* This demo uses QoS1. */ + xMQTTPublishInfo.qos = MQTTQoS1; xMQTTPublishInfo.retain = false; xMQTTPublishInfo.pTopicName = mqttexampleTOPIC; xMQTTPublishInfo.topicNameLength = ( uint16_t ) strlen( mqttexampleTOPIC ); xMQTTPublishInfo.pPayload = mqttexampleMESSAGE; xMQTTPublishInfo.payloadLength = strlen( mqttexampleMESSAGE ); - /* Send a PUBLISH packet. Packet ID is not used for a QoS0 publish. */ - xResult = MQTT_Publish( pxMQTTContext, &xMQTTPublishInfo, 0U ); + /* Get a unique packet id. */ + usPublishPacketIdentifier = MQTT_GetPacketId( pxMQTTContext ); + + /* Send a PUBLISH packet. */ + xResult = MQTT_Publish( pxMQTTContext, &xMQTTPublishInfo, usPublishPacketIdentifier ); configASSERT( xResult == MQTTSuccess ); /* When a PUBLISH packet has been sent, the keep-alive timer can be reset. */ - xTimerStatus = prvCheckTimeoutThenResetTimer( xKeepAliveTimer ); + xTimerStatus = xTimerReset( xPingReqTimer, 0 ); configASSERT( xTimerStatus == pdPASS ); } /*-----------------------------------------------------------*/ @@ -774,6 +854,7 @@ static void prvMQTTUnsubscribeFromTopic( MQTTContext_t * pxMQTTContext ) { MQTTStatus_t xResult; MQTTSubscribeInfo_t xMQTTSubscription[ mqttexampleTOPIC_COUNT ]; + BaseType_t xTimerStatus; /* Some fields are not used by this demo, so start with everything at 0. */ ( void ) memset( ( void * ) &xMQTTSubscription, 0x00, sizeof( xMQTTSubscription ) ); @@ -782,8 +863,8 @@ static void prvMQTTUnsubscribeFromTopic( MQTTContext_t * pxMQTTContext ) usSubscribePacketIdentifier = MQTT_GetPacketId( pxMQTTContext ); /* Subscribe to the mqttexampleTOPIC topic filter. This example subscribes to - * only one topic and uses QoS0. */ - xMQTTSubscription[ 0 ].qos = MQTTQoS0; + * only one topic and uses QoS1. */ + xMQTTSubscription[ 0 ].qos = MQTTQoS1; xMQTTSubscription[ 0 ].pTopicFilter = mqttexampleTOPIC; xMQTTSubscription[ 0 ].topicFilterLength = ( uint16_t ) strlen( mqttexampleTOPIC ); @@ -795,8 +876,11 @@ static void prvMQTTUnsubscribeFromTopic( MQTTContext_t * pxMQTTContext ) xMQTTSubscription, sizeof( xMQTTSubscription ) / sizeof( MQTTSubscribeInfo_t ), usUnsubscribePacketIdentifier ); - configASSERT( xResult == MQTTSuccess ); + + /* When an UNSUBSCRIBE packet has been sent, the keep-alive timer can be reset. */ + xTimerStatus = xTimerReset( xPingReqTimer, 0 ); + configASSERT( xTimerStatus == pdPASS ); } /*-----------------------------------------------------------*/ @@ -807,7 +891,15 @@ static void prvMQTTProcessResponse( MQTTPacketInfo_t * pxIncomingPacket, switch( pxIncomingPacket->type ) { + case MQTT_PACKET_TYPE_PUBACK: + xReceivedPubAck = pdTRUE; + LogInfo( ( "PUBACK received for packet Id %u.\r\n", usPacketId ) ); + /* Make sure ACK packet identifier matches with Request packet identifier. */ + configASSERT( usPublishPacketIdentifier == usPacketId ); + break; + case MQTT_PACKET_TYPE_SUBACK: + xReceivedSubAck = pdTRUE; /* A SUBACK from the broker, containing the server response to our subscription request, has been received. * It contains the status code indicating server approval/rejection for the subscription to the single topic @@ -830,6 +922,7 @@ static void prvMQTTProcessResponse( MQTTPacketInfo_t * pxIncomingPacket, break; case MQTT_PACKET_TYPE_UNSUBACK: + xReceivedUnsubAck = pdTRUE; LogInfo( ( "Unsubscribed from the topic %s.", mqttexampleTOPIC ) ); /* Make sure ACK packet identifier matches with Request packet identifier. */ configASSERT( usUnsubscribePacketIdentifier == usPacketId ); @@ -869,7 +962,7 @@ static void prvMQTTProcessIncomingPublish( MQTTPublishInfo_t * pxPublishInfo ) } else { - LogInfo( ( "Incoming Publish Topic Name: %.*s does not match subscribed topic.", + LogInfo( ( "Incoming Publish Topic Name: %.*s does not match subscribed topic.\r\n", pxPublishInfo->topicNameLength, pxPublishInfo->pTopicName ) ); } @@ -877,52 +970,43 @@ static void prvMQTTProcessIncomingPublish( MQTTPublishInfo_t * pxPublishInfo ) /*-----------------------------------------------------------*/ -static BaseType_t prvCheckTimeoutThenResetTimer( TimerHandle_t pxTimer ) -{ - uint32_t now = 0U; - - if( xWaitingForPingResp == true ) - { - now = prvGetTimeMs(); - /* Assert that the PINGRESP timeout has not expired. */ - configASSERT( ( now - ulPingReqSendTimeMs ) <= ulPingRespTimeoutMs ); - } - - return xTimerReset( pxTimer, 0 ); -} - -/*-----------------------------------------------------------*/ - -static void prvKeepAliveTimerCallback( TimerHandle_t pxTimer ) +static void prvPingReqTimerCallback( TimerHandle_t pxTimer ) { TransportInterface_t * pxTransport; int32_t xTransportStatus; - uint32_t now = 0U; + BaseType_t xTimerStatus; pxTransport = ( TransportInterface_t * ) pvTimerGetTimerID( pxTimer ); - if( xWaitingForPingResp == true ) + /* Do not resend if waiting on a PINGRESP. */ + if( xWaitingForPingResp == false ) { - now = prvGetTimeMs(); - /* Assert that the PINGRESP timeout has not expired. */ - configASSERT( ( now - ulPingReqSendTimeMs ) <= ulPingRespTimeoutMs ); - } - else - { - /* Send Ping Request to the broker. */ - LogInfo( ( "Attempt to ping the MQTT broker." ) ); + /* Send PINGREQ to broker */ + LogInfo( ( "Ping the MQTT broker." ) ); xTransportStatus = pxTransport->send( pxTransport->pNetworkContext, ( void * ) xPingReqBuffer.pBuffer, xPingReqBuffer.size ); configASSERT( ( size_t ) xTransportStatus == xPingReqBuffer.size ); - ulPingReqSendTimeMs = prvGetTimeMs(); xWaitingForPingResp = true; + /* Start the timer to expect a PINGRESP. */ + xTimerStatus = xTimerStart( xPingRespTimer, 0 ); + configASSERT( xTimerStatus == pdPASS ); } } /*-----------------------------------------------------------*/ +static void prvPingRespTimerCallback( TimerHandle_t pxTimer ) +{ + ( void ) pxTimer; + + /* Assert that a pending PINGRESP has been received. */ + configASSERT( xWaitingForPingResp == false ); +} + +/*-----------------------------------------------------------*/ + static void prvEventCallback( MQTTContext_t * pxMQTTContext, MQTTPacketInfo_t * pxPacketInfo, MQTTDeserializedInfo_t * pxDeserializedInfo ) @@ -942,22 +1026,9 @@ static void prvEventCallback( MQTTContext_t * pxMQTTContext, /*-----------------------------------------------------------*/ -static uint32_t prvGetTimeMs( void ) +static uint32_t prvMockedGetTime( void ) { - TickType_t xTickCount = 0; - uint32_t ulTimeMs = 0UL; - - /* Get the current tick count. */ - xTickCount = xTaskGetTickCount(); - - /* Convert the ticks to milliseconds. */ - ulTimeMs = ( uint32_t ) xTickCount * MILLISECONDS_PER_TICK; - - /* Reduce ulGlobalEntryTimeMs from obtained time so as to always return the - * elapsed time in the application. */ - ulTimeMs = ( uint32_t ) ( ulTimeMs - ulGlobalEntryTimeMs ); - - return ulTimeMs; + return 0; } /*-----------------------------------------------------------*/