mirror of
https://github.com/FreeRTOS/FreeRTOS.git
synced 2025-06-28 12:57:52 +08:00
Add retries to demos in case of a failure. (#435)
This commit is contained in:
@ -107,6 +107,22 @@
|
||||
*/
|
||||
#define DEFENDER_RESPONSE_REPORT_ID_FIELD_LENGTH ( sizeof( DEFENDER_RESPONSE_REPORT_ID_FIELD ) - 1 )
|
||||
|
||||
/**
|
||||
* @brief The maximum number of times to run the loop in this demo.
|
||||
*
|
||||
* @note The demo loop is attempted to re-run only if it fails in an iteration.
|
||||
* Once the demo loop succeeds in an iteration, the demo exits successfully.
|
||||
*/
|
||||
#ifndef DEFENDER_MAX_DEMO_LOOP_COUNT
|
||||
#define DEFENDER_MAX_DEMO_LOOP_COUNT ( 3 )
|
||||
#endif
|
||||
|
||||
/**
|
||||
* @brief Time in ticks to wait between retries of the demo loop if
|
||||
* demo loop fails.
|
||||
*/
|
||||
#define DELAY_BETWEEN_DEMO_RETRY_ITERATIONS_TICKS ( pdMS_TO_TICKS( 5000U ) )
|
||||
|
||||
/**
|
||||
* @brief Status values of the device defender report.
|
||||
*/
|
||||
@ -633,6 +649,7 @@ void prvDefenderDemoTask( void * pvParameters )
|
||||
bool xStatus = false;
|
||||
BaseType_t xExitStatus = EXIT_FAILURE;
|
||||
uint32_t ulReportLength = 0UL, i, ulMqttSessionEstablished = 0UL;
|
||||
UBaseType_t uxDemoRunCount = 0UL;
|
||||
|
||||
/* Remove compiler warnings about unused parameters. */
|
||||
( void ) pvParameters;
|
||||
@ -643,188 +660,220 @@ void prvDefenderDemoTask( void * pvParameters )
|
||||
/* Start with report not received. */
|
||||
xReportStatus = ReportStatusNotReceived;
|
||||
|
||||
/* Set a report Id to be used.
|
||||
*
|
||||
* !!!NOTE!!!
|
||||
* This demo sets the report ID to xTaskGetTickCount(), which may collide
|
||||
* if the device is reset. Reports for a Thing with a previously used
|
||||
* report ID will be assumed to be duplicates and discarded by the Device
|
||||
* Defender service. The report ID needs to be unique per report sent with
|
||||
* a given Thing. We recommend using an increasing unique id such as the
|
||||
* current timestamp. */
|
||||
ulReportId = ( uint32_t ) xTaskGetTickCount();
|
||||
|
||||
/****************************** Connect. ******************************/
|
||||
|
||||
/* Attempts to connect to the AWS IoT MQTT broker over TCP. If the
|
||||
* connection fails, retries after a timeout. Timeout value will
|
||||
* exponentially increase until maximum attempts are reached. */
|
||||
LogInfo( ( "Establishing MQTT session..." ) );
|
||||
xStatus = xEstablishMqttSession( &xMqttContext,
|
||||
&xNetworkContext,
|
||||
&xBuffer,
|
||||
prvPublishCallback );
|
||||
|
||||
if( xStatus != true )
|
||||
/* This demo runs a single loop unless there are failures in the demo execution.
|
||||
* In case of failures in the demo execution, demo loop will be retried for up to
|
||||
* DEFENDER_MAX_DEMO_LOOP_COUNT times. */
|
||||
do
|
||||
{
|
||||
LogError( ( "Failed to establish MQTT session." ) );
|
||||
}
|
||||
else
|
||||
{
|
||||
ulMqttSessionEstablished = 1;
|
||||
}
|
||||
/* Set a report Id to be used.
|
||||
*
|
||||
* !!!NOTE!!!
|
||||
* This demo sets the report ID to xTaskGetTickCount(), which may collide
|
||||
* if the device is reset. Reports for a Thing with a previously used
|
||||
* report ID will be assumed to be duplicates and discarded by the Device
|
||||
* Defender service. The report ID needs to be unique per report sent with
|
||||
* a given Thing. We recommend using an increasing unique id such as the
|
||||
* current timestamp. */
|
||||
ulReportId = ( uint32_t ) xTaskGetTickCount();
|
||||
|
||||
/******************** Subscribe to Defender topics. *******************/
|
||||
/****************************** Connect. ******************************/
|
||||
|
||||
/* Attempt to subscribe to the AWS IoT Device Defender topics.
|
||||
* Since this demo is using JSON, in prvSubscribeToDefenderTopics() we
|
||||
* subscribe to the topics to which accepted and rejected responses are
|
||||
* received from after publishing a JSON report.
|
||||
*
|
||||
* This demo uses a constant #democonfigTHING_NAME known at compile time
|
||||
* therefore we use macros to assemble defender topic strings.
|
||||
* If the thing name is known at run time, then we could use the API
|
||||
* #Defender_GetTopic instead.
|
||||
*
|
||||
* For example, for the JSON accepted responses topic:
|
||||
*
|
||||
* #define TOPIC_BUFFER_LENGTH ( 256U )
|
||||
*
|
||||
* // Every device should have a unique thing name registered with AWS IoT Core.
|
||||
* // This example assumes that the device has a unique serial number which is
|
||||
* // registered as the thing name with AWS IoT Core.
|
||||
* const char * pThingName = GetDeviceSerialNumber();
|
||||
* uint16_t thingNameLength = ( uint16_t )strlen( pThingname );
|
||||
* char topicBuffer[ TOPIC_BUFFER_LENGTH ] = { 0 };
|
||||
* uint16_t topicLength = 0;
|
||||
* DefenderStatus_t status = DefenderSuccess;
|
||||
*
|
||||
* status = Defender_GetTopic( &( topicBuffer[ 0 ] ),
|
||||
* TOPIC_BUFFER_LENGTH,
|
||||
* pThingName,
|
||||
* thingNameLength,
|
||||
* DefenderJsonReportAccepted,
|
||||
* &( topicLength ) );
|
||||
*/
|
||||
if( xStatus == true )
|
||||
{
|
||||
LogInfo( ( "Subscribing to defender topics..." ) );
|
||||
xStatus = prvSubscribeToDefenderTopics();
|
||||
/* Attempts to connect to the AWS IoT MQTT broker over TCP. If the
|
||||
* connection fails, retries after a timeout. Timeout value will
|
||||
* exponentially increase until maximum attempts are reached. */
|
||||
LogInfo( ( "Establishing MQTT session..." ) );
|
||||
xStatus = xEstablishMqttSession( &xMqttContext,
|
||||
&xNetworkContext,
|
||||
&xBuffer,
|
||||
prvPublishCallback );
|
||||
|
||||
if( xStatus != true )
|
||||
{
|
||||
LogError( ( "Failed to subscribe to defender topics." ) );
|
||||
LogError( ( "Failed to establish MQTT session." ) );
|
||||
}
|
||||
}
|
||||
|
||||
/*********************** Collect device metrics. **********************/
|
||||
|
||||
/* We then need to collect the metrics that will be sent to the AWS IoT
|
||||
* Device Defender service. This demo uses the functions declared in
|
||||
* in metrics_collector.h to collect network metrics. For this demo, the
|
||||
* implementation of these functions are in metrics_collector.c and
|
||||
* collects metrics using tcp_netstat utility for FreeRTOS+TCP. */
|
||||
if( xStatus == true )
|
||||
{
|
||||
LogInfo( ( "Collecting device metrics..." ) );
|
||||
xStatus = prvCollectDeviceMetrics();
|
||||
|
||||
if( xStatus != true )
|
||||
else
|
||||
{
|
||||
LogError( ( "Failed to collect device metrics." ) );
|
||||
ulMqttSessionEstablished = 1;
|
||||
}
|
||||
}
|
||||
|
||||
/********************** Generate defender report. *********************/
|
||||
/******************** Subscribe to Defender topics. *******************/
|
||||
|
||||
/* The data needs to be incorporated into a JSON formatted report,
|
||||
* which follows the format expected by the Device Defender service.
|
||||
* This format is documented here:
|
||||
* https://docs.aws.amazon.com/iot/latest/developerguide/detect-device-side-metrics.html
|
||||
*/
|
||||
if( xStatus == true )
|
||||
{
|
||||
LogInfo( ( "Generating device defender report..." ) );
|
||||
xStatus = prvGenerateDeviceMetricsReport( &( ulReportLength ) );
|
||||
|
||||
if( xStatus != true )
|
||||
/* Attempt to subscribe to the AWS IoT Device Defender topics.
|
||||
* Since this demo is using JSON, in prvSubscribeToDefenderTopics() we
|
||||
* subscribe to the topics to which accepted and rejected responses are
|
||||
* received from after publishing a JSON report.
|
||||
*
|
||||
* This demo uses a constant #democonfigTHING_NAME known at compile time
|
||||
* therefore we use macros to assemble defender topic strings.
|
||||
* If the thing name is known at run time, then we could use the API
|
||||
* #Defender_GetTopic instead.
|
||||
*
|
||||
* For example, for the JSON accepted responses topic:
|
||||
*
|
||||
* #define TOPIC_BUFFER_LENGTH ( 256U )
|
||||
*
|
||||
* // Every device should have a unique thing name registered with AWS IoT Core.
|
||||
* // This example assumes that the device has a unique serial number which is
|
||||
* // registered as the thing name with AWS IoT Core.
|
||||
* const char * pThingName = GetDeviceSerialNumber();
|
||||
* uint16_t thingNameLength = ( uint16_t )strlen( pThingname );
|
||||
* char topicBuffer[ TOPIC_BUFFER_LENGTH ] = { 0 };
|
||||
* uint16_t topicLength = 0;
|
||||
* DefenderStatus_t status = DefenderSuccess;
|
||||
*
|
||||
* status = Defender_GetTopic( &( topicBuffer[ 0 ] ),
|
||||
* TOPIC_BUFFER_LENGTH,
|
||||
* pThingName,
|
||||
* thingNameLength,
|
||||
* DefenderJsonReportAccepted,
|
||||
* &( topicLength ) );
|
||||
*/
|
||||
if( xStatus == true )
|
||||
{
|
||||
LogError( ( "Failed to generate device defender report." ) );
|
||||
}
|
||||
}
|
||||
LogInfo( ( "Subscribing to defender topics..." ) );
|
||||
xStatus = prvSubscribeToDefenderTopics();
|
||||
|
||||
/********************** Publish defender report. **********************/
|
||||
|
||||
/* The report is then published to the Device Defender service. This report
|
||||
* is published to the MQTT topic for publishing JSON reports. As before,
|
||||
* we use the defender library macros to create the topic string, though
|
||||
* #Defender_GetTopic could be used if the Thing name is acquired at
|
||||
* run time */
|
||||
if( xStatus == true )
|
||||
{
|
||||
LogInfo( ( "Publishing device defender report..." ) );
|
||||
xStatus = prvPublishDeviceMetricsReport( ulReportLength );
|
||||
|
||||
if( xStatus != true )
|
||||
{
|
||||
LogError( ( "Failed to publish device defender report." ) );
|
||||
}
|
||||
}
|
||||
|
||||
/* Wait for the response to our report. Response will be handled by the
|
||||
* callback passed to xEstablishMqttSession() earlier.
|
||||
* The callback will verify that the MQTT messages received are from the
|
||||
* defender service's topic. Based on whether the response comes from
|
||||
* the accepted or rejected topics, it updates xReportStatus. */
|
||||
if( xStatus == true )
|
||||
{
|
||||
for( i = 0; i < DEFENDER_RESPONSE_WAIT_SECONDS; i++ )
|
||||
{
|
||||
( void ) xProcessLoop( &xMqttContext );
|
||||
|
||||
/* xReportStatus is updated in the prvPublishCallback. */
|
||||
if( xReportStatus != ReportStatusNotReceived )
|
||||
if( xStatus != true )
|
||||
{
|
||||
break;
|
||||
LogError( ( "Failed to subscribe to defender topics." ) );
|
||||
}
|
||||
}
|
||||
|
||||
/*********************** Collect device metrics. **********************/
|
||||
|
||||
/* We then need to collect the metrics that will be sent to the AWS IoT
|
||||
* Device Defender service. This demo uses the functions declared in
|
||||
* in metrics_collector.h to collect network metrics. For this demo, the
|
||||
* implementation of these functions are in metrics_collector.c and
|
||||
* collects metrics using tcp_netstat utility for FreeRTOS+TCP. */
|
||||
if( xStatus == true )
|
||||
{
|
||||
LogInfo( ( "Collecting device metrics..." ) );
|
||||
xStatus = prvCollectDeviceMetrics();
|
||||
|
||||
if( xStatus != true )
|
||||
{
|
||||
LogError( ( "Failed to collect device metrics." ) );
|
||||
}
|
||||
}
|
||||
|
||||
/********************** Generate defender report. *********************/
|
||||
|
||||
/* The data needs to be incorporated into a JSON formatted report,
|
||||
* which follows the format expected by the Device Defender service.
|
||||
* This format is documented here:
|
||||
* https://docs.aws.amazon.com/iot/latest/developerguide/detect-device-side-metrics.html
|
||||
*/
|
||||
if( xStatus == true )
|
||||
{
|
||||
LogInfo( ( "Generating device defender report..." ) );
|
||||
xStatus = prvGenerateDeviceMetricsReport( &( ulReportLength ) );
|
||||
|
||||
if( xStatus != true )
|
||||
{
|
||||
LogError( ( "Failed to generate device defender report." ) );
|
||||
}
|
||||
}
|
||||
|
||||
/********************** Publish defender report. **********************/
|
||||
|
||||
/* The report is then published to the Device Defender service. This report
|
||||
* is published to the MQTT topic for publishing JSON reports. As before,
|
||||
* we use the defender library macros to create the topic string, though
|
||||
* #Defender_GetTopic could be used if the Thing name is acquired at
|
||||
* run time */
|
||||
if( xStatus == true )
|
||||
{
|
||||
LogInfo( ( "Publishing device defender report..." ) );
|
||||
xStatus = prvPublishDeviceMetricsReport( ulReportLength );
|
||||
|
||||
if( xStatus != true )
|
||||
{
|
||||
LogError( ( "Failed to publish device defender report." ) );
|
||||
}
|
||||
}
|
||||
|
||||
/* Wait for the response to our report. Response will be handled by the
|
||||
* callback passed to xEstablishMqttSession() earlier.
|
||||
* The callback will verify that the MQTT messages received are from the
|
||||
* defender service's topic. Based on whether the response comes from
|
||||
* the accepted or rejected topics, it updates xReportStatus. */
|
||||
if( xStatus == true )
|
||||
{
|
||||
for( i = 0; i < DEFENDER_RESPONSE_WAIT_SECONDS; i++ )
|
||||
{
|
||||
( void ) xProcessLoop( &xMqttContext );
|
||||
|
||||
/* xReportStatus is updated in the prvPublishCallback. */
|
||||
if( xReportStatus != ReportStatusNotReceived )
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
/* Wait for sometime between consecutive executions of ProcessLoop. */
|
||||
vTaskDelay( pdMS_TO_TICKS( 1000U ) );
|
||||
}
|
||||
}
|
||||
|
||||
if( xReportStatus == ReportStatusNotReceived )
|
||||
{
|
||||
LogError( ( "Failed to receive response from AWS IoT Device Defender Service." ) );
|
||||
xStatus = false;
|
||||
}
|
||||
|
||||
/**************************** Disconnect. *****************************/
|
||||
|
||||
/* Unsubscribe and disconnect if MQTT session was established. Per the MQTT
|
||||
* protocol spec, it is okay to send UNSUBSCRIBE even if no corresponding
|
||||
* subscription exists on the broker. Therefore, it is okay to attempt
|
||||
* unsubscribe even if one more subscribe failed earlier. */
|
||||
if( ulMqttSessionEstablished == 1 )
|
||||
{
|
||||
LogInfo( ( "Unsubscribing from defender topics..." ) );
|
||||
xStatus = prvUnsubscribeFromDefenderTopics();
|
||||
|
||||
if( xStatus != true )
|
||||
{
|
||||
LogError( ( "Failed to unsubscribe from defender topics." ) );
|
||||
}
|
||||
|
||||
/* Wait for sometime between consecutive executions of ProcessLoop. */
|
||||
vTaskDelay( 1000 / portTICK_PERIOD_MS );
|
||||
LogInfo( ( "Closing MQTT session..." ) );
|
||||
( void ) xDisconnectMqttSession( &xMqttContext,
|
||||
&xNetworkContext );
|
||||
}
|
||||
}
|
||||
|
||||
if( xReportStatus == ReportStatusNotReceived )
|
||||
{
|
||||
LogError( ( "Failed to receive response from AWS IoT Device Defender Service." ) );
|
||||
xStatus = false;
|
||||
}
|
||||
|
||||
/**************************** Disconnect. *****************************/
|
||||
|
||||
/* Unsubscribe and disconnect if MQTT session was established. Per the MQTT
|
||||
* protocol spec, it is okay to send UNSUBSCRIBE even if no corresponding
|
||||
* subscription exists on the broker. Therefore, it is okay to attempt
|
||||
* unsubscribe even if one more subscribe failed earlier. */
|
||||
if( ulMqttSessionEstablished == 1 )
|
||||
{
|
||||
LogInfo( ( "Unsubscribing from defender topics..." ) );
|
||||
xStatus = prvUnsubscribeFromDefenderTopics();
|
||||
|
||||
if( xStatus != true )
|
||||
if( ( xStatus == true ) && ( xReportStatus == ReportStatusAccepted ) )
|
||||
{
|
||||
LogError( ( "Failed to unsubscribe from defender topics." ) );
|
||||
xExitStatus = EXIT_SUCCESS;
|
||||
}
|
||||
|
||||
LogInfo( ( "Closing MQTT session..." ) );
|
||||
( void ) xDisconnectMqttSession( &xMqttContext,
|
||||
&xNetworkContext );
|
||||
}
|
||||
/*********************** Retry in case of failure. ************************/
|
||||
|
||||
/* Increment the demo run count. */
|
||||
uxDemoRunCount++;
|
||||
|
||||
if( xExitStatus == EXIT_SUCCESS )
|
||||
{
|
||||
LogInfo( ( "Demo iteration %lu is successful.", uxDemoRunCount ) );
|
||||
}
|
||||
/* Attempt to retry a failed iteration of demo for up to #DEFENDER_MAX_DEMO_LOOP_COUNT times. */
|
||||
else if( uxDemoRunCount < DEFENDER_MAX_DEMO_LOOP_COUNT )
|
||||
{
|
||||
LogWarn( ( "Demo iteration %lu failed. Retrying...", uxDemoRunCount ) );
|
||||
vTaskDelay( DELAY_BETWEEN_DEMO_RETRY_ITERATIONS_TICKS );
|
||||
}
|
||||
/* Failed all #DEFENDER_MAX_DEMO_LOOP_COUNT demo iterations. */
|
||||
else
|
||||
{
|
||||
LogError( ( "All %d demo iterations failed.", DEFENDER_MAX_DEMO_LOOP_COUNT ) );
|
||||
break;
|
||||
}
|
||||
} while( xExitStatus != EXIT_SUCCESS );
|
||||
|
||||
/****************************** Finish. ******************************/
|
||||
|
||||
if( ( xStatus == true ) && ( xReportStatus == ReportStatusAccepted ) )
|
||||
if( xExitStatus == EXIT_SUCCESS )
|
||||
{
|
||||
xExitStatus = EXIT_SUCCESS;
|
||||
LogInfo( ( "Demo completed successfully." ) );
|
||||
}
|
||||
else
|
||||
|
@ -148,6 +148,55 @@
|
||||
*/
|
||||
#define SHADOW_REPORTED_JSON_LENGTH ( sizeof( SHADOW_REPORTED_JSON ) - 3 )
|
||||
|
||||
/**
|
||||
* @brief The maximum number of times to run the loop in this demo.
|
||||
*
|
||||
* @note The demo loop is attempted to re-run only if it fails in an iteration.
|
||||
* Once the demo loop succeeds in an iteration, the demo exits successfully.
|
||||
*/
|
||||
#ifndef SHADOW_MAX_DEMO_LOOP_COUNT
|
||||
#define SHADOW_MAX_DEMO_LOOP_COUNT ( 3 )
|
||||
#endif
|
||||
|
||||
/**
|
||||
* @brief Time in ticks to wait between retries of the demo loop if
|
||||
* demo loop fails.
|
||||
*/
|
||||
#define DELAY_BETWEEN_DEMO_RETRY_ITERATIONS_TICKS ( pdMS_TO_TICKS( 5000U ) )
|
||||
|
||||
/**
|
||||
* @brief The maximum number of times to call MQTT_ProcessLoop() when waiting
|
||||
* for a response for Shadow delete operation.
|
||||
*/
|
||||
#define MQTT_PROCESS_LOOP_DELETE_RESPONSE_COUNT_MAX ( 30U )
|
||||
|
||||
/**
|
||||
* @brief Timeout for MQTT_ProcessLoop in milliseconds.
|
||||
*/
|
||||
#define MQTT_PROCESS_LOOP_TIMEOUT_MS ( 700U )
|
||||
|
||||
/**
|
||||
* @brief JSON key for response code that indicates the type of error in
|
||||
* the error document received on topic `/delete/rejected`.
|
||||
*/
|
||||
#define SHADOW_DELETE_REJECTED_ERROR_CODE_KEY "code"
|
||||
|
||||
/**
|
||||
* @brief Length of #SHADOW_DELETE_REJECTED_ERROR_CODE_KEY.
|
||||
*/
|
||||
#define SHADOW_DELETE_REJECTED_ERROR_CODE_KEY_LENGTH ( ( uint16_t ) ( sizeof( SHADOW_DELETE_REJECTED_ERROR_CODE_KEY ) - 1 ) )
|
||||
|
||||
/**
|
||||
* @brief Error response code sent from AWS IoT Shadow service when an attempt
|
||||
* is made to delete a Shadow document that doesn't exist.
|
||||
*/
|
||||
#define SHADOW_NO_SHADOW_EXISTS_ERROR_CODE "404"
|
||||
|
||||
/**
|
||||
* @brief Length of #SHADOW_NO_SHADOW_EXISTS_ERROR_CODE.
|
||||
*/
|
||||
#define SHADOW_NO_SHADOW_EXISTS_ERROR_CODE_LENGTH ( ( uint16_t ) ( sizeof( SHADOW_NO_SHADOW_EXISTS_ERROR_CODE ) - 1 ) )
|
||||
|
||||
/*------------- Demo configurations -------------------------*/
|
||||
|
||||
#ifndef democonfigTHING_NAME
|
||||
@ -225,6 +274,24 @@ static BaseType_t xUpdateDeltaReturn = pdPASS;
|
||||
*/
|
||||
static BaseType_t xUpdateAcceptedReturn = pdPASS;
|
||||
|
||||
/**
|
||||
* @brief Status of the response of Shadow delete operation from AWS IoT
|
||||
* message broker.
|
||||
*/
|
||||
static BaseType_t xDeleteResponseReceived = pdFALSE;
|
||||
|
||||
/**
|
||||
* @brief Status of the Shadow delete operation.
|
||||
*
|
||||
* The Shadow delete status will be updated by the incoming publishes on the
|
||||
* MQTT topics for delete acknowledgement from AWS IoT message broker
|
||||
* (accepted/rejected). Shadow document is considered to be deleted if an
|
||||
* incoming publish is received on `/delete/accepted` topic or an incoming
|
||||
* publish is received on `/delete/rejected` topic with error code 404. Code 404
|
||||
* indicates that the Shadow document does not exist for the Thing yet.
|
||||
*/
|
||||
static BaseType_t xShadowDeleted = pdFALSE;
|
||||
|
||||
/*-----------------------------------------------------------*/
|
||||
|
||||
/**
|
||||
@ -271,6 +338,126 @@ static void prvUpdateAcceptedHandler( MQTTPublishInfo_t * pxPublishInfo );
|
||||
*/
|
||||
static void prvShadowDemoTask( void * pvParameters );
|
||||
|
||||
/**
|
||||
* @brief Process payload from `/delete/rejected` topic.
|
||||
*
|
||||
* This handler examines the rejected message to look for the reject reason code.
|
||||
* If the reject reason code is `404`, an attempt was made to delete a shadow
|
||||
* document which was not present yet. This is considered to be success for this
|
||||
* demo application.
|
||||
*
|
||||
* @param[in] pxPublishInfo Deserialized publish info pointer for the incoming
|
||||
* packet.
|
||||
*/
|
||||
static void prvDeleteRejectedHandler( MQTTPublishInfo_t * pxPublishInfo );
|
||||
|
||||
/**
|
||||
* @brief Helper function to wait for a response for Shadow delete operation.
|
||||
*
|
||||
* @param[in] pxMQTTContext MQTT context pointer.
|
||||
*
|
||||
* @return pdPASS if successfully received a response for Shadow delete
|
||||
* operation; pdFAIL otherwise.
|
||||
*/
|
||||
static BaseType_t prvWaitForDeleteResponse( MQTTContext_t * pxMQTTContext );
|
||||
|
||||
/*-----------------------------------------------------------*/
|
||||
|
||||
static BaseType_t prvWaitForDeleteResponse( MQTTContext_t * pxMQTTContext )
|
||||
{
|
||||
uint8_t ucCount = 0U;
|
||||
MQTTStatus_t xMQTTStatus = MQTTSuccess;
|
||||
BaseType_t xReturnStatus = pdPASS;
|
||||
|
||||
configASSERT( pxMQTTContext != NULL );
|
||||
|
||||
while( ( xDeleteResponseReceived != pdTRUE ) &&
|
||||
( ucCount++ < MQTT_PROCESS_LOOP_DELETE_RESPONSE_COUNT_MAX ) &&
|
||||
( xMQTTStatus == MQTTSuccess ) )
|
||||
{
|
||||
/* Event callback will set #xDeleteResponseReceived when receiving an
|
||||
* incoming publish on either `/delete/accepted` or `/delete/rejected`
|
||||
* Shadow topics. */
|
||||
xMQTTStatus = MQTT_ProcessLoop( pxMQTTContext, MQTT_PROCESS_LOOP_TIMEOUT_MS );
|
||||
}
|
||||
|
||||
if( ( xMQTTStatus != MQTTSuccess ) || ( xDeleteResponseReceived != pdTRUE ) )
|
||||
{
|
||||
LogError( ( "MQTT_ProcessLoop failed to receive a response for Shadow delete operation:"
|
||||
" LoopDuration=%u, MQTT Status=%s.",
|
||||
( MQTT_PROCESS_LOOP_TIMEOUT_MS * ucCount ),
|
||||
MQTT_Status_strerror( xMQTTStatus ) ) );
|
||||
xReturnStatus = pdFAIL;
|
||||
}
|
||||
|
||||
return xReturnStatus;
|
||||
}
|
||||
|
||||
/*-----------------------------------------------------------*/
|
||||
|
||||
static void prvDeleteRejectedHandler( MQTTPublishInfo_t * pxPublishInfo )
|
||||
{
|
||||
JSONStatus_t result = JSONSuccess;
|
||||
char * pcOutValue = NULL;
|
||||
uint32_t ulOutValueLength = 0UL;
|
||||
|
||||
configASSERT( pxPublishInfo != NULL );
|
||||
configASSERT( pxPublishInfo->pPayload != NULL );
|
||||
|
||||
LogInfo( ( "/delete/rejected json payload:%s.", ( const char * ) pxPublishInfo->pPayload ) );
|
||||
|
||||
/* The payload will look similar to this:
|
||||
* {
|
||||
* "code": error-code,
|
||||
* "message": "error-message",
|
||||
* "timestamp": timestamp,
|
||||
* "clientToken": "token"
|
||||
* }
|
||||
*/
|
||||
|
||||
/* Make sure the payload is a valid json document. */
|
||||
result = JSON_Validate( pxPublishInfo->pPayload,
|
||||
pxPublishInfo->payloadLength );
|
||||
|
||||
if( result == JSONSuccess )
|
||||
{
|
||||
/* Then we start to get the version value by JSON keyword "version". */
|
||||
result = JSON_SearchConst( pxPublishInfo->pPayload,
|
||||
pxPublishInfo->payloadLength,
|
||||
SHADOW_DELETE_REJECTED_ERROR_CODE_KEY,
|
||||
SHADOW_DELETE_REJECTED_ERROR_CODE_KEY_LENGTH,
|
||||
&pcOutValue,
|
||||
( size_t * ) &ulOutValueLength,
|
||||
NULL );
|
||||
}
|
||||
else
|
||||
{
|
||||
LogError( ( "The json document is invalid!!" ) );
|
||||
}
|
||||
|
||||
if( result == JSONSuccess )
|
||||
{
|
||||
LogInfo( ( "Error code is: %.*s.",
|
||||
ulOutValueLength,
|
||||
pcOutValue ) );
|
||||
|
||||
/* Check if error code is `404`. An error code `404` indicates that an
|
||||
* attempt was made to delete a Shadow document that didn't exist. */
|
||||
if( ulOutValueLength == SHADOW_NO_SHADOW_EXISTS_ERROR_CODE_LENGTH )
|
||||
{
|
||||
if( strncmp( pcOutValue, SHADOW_NO_SHADOW_EXISTS_ERROR_CODE,
|
||||
SHADOW_NO_SHADOW_EXISTS_ERROR_CODE_LENGTH ) == 0 )
|
||||
{
|
||||
xShadowDeleted = pdTRUE;
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
LogError( ( "No error code in json document!!" ) );
|
||||
}
|
||||
}
|
||||
|
||||
/*-----------------------------------------------------------*/
|
||||
|
||||
static void prvUpdateDeltaHandler( MQTTPublishInfo_t * pxPublishInfo )
|
||||
@ -536,6 +723,18 @@ static void prvEventCallback( MQTTContext_t * pxMqttContext,
|
||||
{
|
||||
LogInfo( ( "/update/rejected json payload:%s.", ( const char * ) pxDeserializedInfo->pPublishInfo->pPayload ) );
|
||||
}
|
||||
else if( messageType == ShadowMessageTypeDeleteAccepted )
|
||||
{
|
||||
LogInfo( ( "Received an MQTT incoming publish on /delete/accepted topic." ) );
|
||||
xShadowDeleted = pdTRUE;
|
||||
xDeleteResponseReceived = pdTRUE;
|
||||
}
|
||||
else if( messageType == ShadowMessageTypeDeleteRejected )
|
||||
{
|
||||
/* Handler function to process payload. */
|
||||
prvDeleteRejectedHandler( pxDeserializedInfo->pPublishInfo );
|
||||
xDeleteResponseReceived = pdTRUE;
|
||||
}
|
||||
else
|
||||
{
|
||||
LogInfo( ( "Other message type:%d !!", messageType ) );
|
||||
@ -593,7 +792,8 @@ void vStartShadowDemo( void )
|
||||
*/
|
||||
void prvShadowDemoTask( void * pvParameters )
|
||||
{
|
||||
BaseType_t demoStatus = pdPASS;
|
||||
BaseType_t xDemoStatus = pdPASS;
|
||||
UBaseType_t uxDemoRunCount = 0UL;
|
||||
|
||||
/* A buffer containing the update document. It has static duration to prevent
|
||||
* it from being placed on the call stack. */
|
||||
@ -607,205 +807,291 @@ void prvShadowDemoTask( void * pvParameters )
|
||||
|
||||
/****************************** Connect. ******************************/
|
||||
|
||||
demoStatus = xEstablishMqttSession( &xMqttContext,
|
||||
&xNetworkContext,
|
||||
&xBuffer,
|
||||
prvEventCallback );
|
||||
|
||||
if( demoStatus == pdFAIL )
|
||||
/* This demo runs a single loop unless there are failures in the demo execution.
|
||||
* In case of failures in the demo execution, demo loop will be retried for up to
|
||||
* SHADOW_MAX_DEMO_LOOP_COUNT times. */
|
||||
do
|
||||
{
|
||||
/* Log error to indicate connection failure. */
|
||||
LogError( ( "Failed to connect to MQTT broker." ) );
|
||||
}
|
||||
/****************************** Connect. ******************************/
|
||||
|
||||
if( demoStatus == pdPASS )
|
||||
{
|
||||
/* Try to delete any Shadow document in the cloud. This is done to
|
||||
* ensure possbile previous Shadow documents do not affect the state
|
||||
* expected in the current demo. */
|
||||
demoStatus = xPublishToTopic( &xMqttContext,
|
||||
SHADOW_TOPIC_STRING_DELETE( democonfigTHING_NAME ),
|
||||
SHADOW_TOPIC_LENGTH_DELETE( THING_NAME_LENGTH ),
|
||||
pcUpdateDocument,
|
||||
0U );
|
||||
}
|
||||
xDemoStatus = xEstablishMqttSession( &xMqttContext,
|
||||
&xNetworkContext,
|
||||
&xBuffer,
|
||||
prvEventCallback );
|
||||
|
||||
/********************* Subscribe to Shadow topics. ************************/
|
||||
|
||||
/* Then try to subscribe the Shadow topics. */
|
||||
|
||||
if( demoStatus == pdPASS )
|
||||
{
|
||||
demoStatus = xSubscribeToTopic( &xMqttContext,
|
||||
SHADOW_TOPIC_STRING_UPDATE_DELTA( democonfigTHING_NAME ),
|
||||
SHADOW_TOPIC_LENGTH_UPDATE_DELTA( THING_NAME_LENGTH ) );
|
||||
}
|
||||
|
||||
if( demoStatus == pdPASS )
|
||||
{
|
||||
demoStatus = xSubscribeToTopic( &xMqttContext,
|
||||
SHADOW_TOPIC_STRING_UPDATE_ACCEPTED( democonfigTHING_NAME ),
|
||||
SHADOW_TOPIC_LENGTH_UPDATE_ACCEPTED( THING_NAME_LENGTH ) );
|
||||
}
|
||||
|
||||
if( demoStatus == pdPASS )
|
||||
{
|
||||
demoStatus = xSubscribeToTopic( &xMqttContext,
|
||||
SHADOW_TOPIC_STRING_UPDATE_REJECTED( democonfigTHING_NAME ),
|
||||
SHADOW_TOPIC_LENGTH_UPDATE_REJECTED( THING_NAME_LENGTH ) );
|
||||
}
|
||||
|
||||
/********************* Publish to Shadow topics. **********************/
|
||||
|
||||
/* This demo uses a constant #democonfigTHING_NAME known at compile time
|
||||
* therefore we can use macros to assemble shadow topic strings.
|
||||
* If the thing name is known at run time, then we could use the API
|
||||
* #Shadow_GetTopicString to assemble shadow topic strings, here is the
|
||||
* example for /update/delta:
|
||||
*
|
||||
* For /update/delta:
|
||||
*
|
||||
* #define SHADOW_TOPIC_MAX_LENGTH (256U)
|
||||
*
|
||||
* ShadowStatus_t shadowStatus = SHADOW_STATUS_SUCCESS;
|
||||
* char cTopicBuffer[ SHADOW_TOPIC_MAX_LENGTH ] = { 0 };
|
||||
* uint16_t usBufferSize = SHADOW_TOPIC_MAX_LENGTH;
|
||||
* uint16_t usOutLength = 0;
|
||||
* const char * pcThingName = "TestThingName";
|
||||
* uint16_t usThingNameLength = ( sizeof( pcThingName ) - 1U );
|
||||
*
|
||||
* shadowStatus = Shadow_GetTopicString( SHADOW_TOPIC_STRING_TYPE_UPDATE_DELTA,
|
||||
* pcThingName,
|
||||
* usThingNameLength,
|
||||
* & ( cTopicBuffer[ 0 ] ),
|
||||
* usBufferSize,
|
||||
* & usOutLength );
|
||||
*/
|
||||
|
||||
/* Then we publish a desired state to the /update topic. Since we've deleted
|
||||
* the device shadow at the beginning of the demo, this will cause a delta
|
||||
* message to be published, which we have subscribed to.
|
||||
* In many real applications, the desired state is not published by
|
||||
* the device itself. But for the purpose of making this demo self-contained,
|
||||
* we publish one here so that we can receive a delta message later.
|
||||
*/
|
||||
if( demoStatus == pdPASS )
|
||||
{
|
||||
/* Desired power on state . */
|
||||
LogInfo( ( "Send desired power state with 1." ) );
|
||||
|
||||
( void ) memset( pcUpdateDocument,
|
||||
0x00,
|
||||
sizeof( pcUpdateDocument ) );
|
||||
|
||||
snprintf( pcUpdateDocument,
|
||||
SHADOW_DESIRED_JSON_LENGTH + 1,
|
||||
SHADOW_DESIRED_JSON,
|
||||
( int ) 1,
|
||||
( long unsigned ) ( xTaskGetTickCount() % 1000000 ) );
|
||||
|
||||
demoStatus = xPublishToTopic( &xMqttContext,
|
||||
SHADOW_TOPIC_STRING_UPDATE( democonfigTHING_NAME ),
|
||||
SHADOW_TOPIC_LENGTH_UPDATE( THING_NAME_LENGTH ),
|
||||
pcUpdateDocument,
|
||||
( SHADOW_DESIRED_JSON_LENGTH + 1 ) );
|
||||
}
|
||||
|
||||
if( demoStatus == pdPASS )
|
||||
{
|
||||
/* Note that PublishToTopic already called MQTT_ProcessLoop,
|
||||
* therefore responses may have been received and the prvEventCallback
|
||||
* may have been called, which may have changed the stateChanged flag.
|
||||
* Check if the state change flag has been modified or not. If it's modified,
|
||||
* then we publish reported state to update topic.
|
||||
*/
|
||||
if( stateChanged == true )
|
||||
if( xDemoStatus == pdFAIL )
|
||||
{
|
||||
/* Report the latest power state back to device shadow. */
|
||||
LogInfo( ( "Report to the state change: %d", ulCurrentPowerOnState ) );
|
||||
/* Log error to indicate connection failure. */
|
||||
LogError( ( "Failed to connect to MQTT broker." ) );
|
||||
}
|
||||
|
||||
/**************** Attempt to delete Shadow document. ******************/
|
||||
|
||||
if( xDemoStatus == pdPASS )
|
||||
{
|
||||
/* Reset the shadow delete status flags. */
|
||||
xDeleteResponseReceived = pdFALSE;
|
||||
xShadowDeleted = pdFALSE;
|
||||
|
||||
/* First of all, try to delete any Shadow document in the cloud.
|
||||
* Try to subscribe to `/delete/accepted` and `/delete/rejected` topics. */
|
||||
xDemoStatus = xSubscribeToTopic( &xMqttContext,
|
||||
SHADOW_TOPIC_STRING_DELETE_ACCEPTED( democonfigTHING_NAME ),
|
||||
SHADOW_TOPIC_LENGTH_DELETE_ACCEPTED( THING_NAME_LENGTH ) );
|
||||
}
|
||||
|
||||
if( xDemoStatus == pdPASS )
|
||||
{
|
||||
/* Try to subscribe to `/delete/rejected` topic. */
|
||||
xDemoStatus = xSubscribeToTopic( &xMqttContext,
|
||||
SHADOW_TOPIC_STRING_DELETE_REJECTED( democonfigTHING_NAME ),
|
||||
SHADOW_TOPIC_LENGTH_DELETE_REJECTED( THING_NAME_LENGTH ) );
|
||||
}
|
||||
|
||||
if( xDemoStatus == pdPASS )
|
||||
{
|
||||
/* Publish to Shadow `delete` topic to attempt to delete the
|
||||
* Shadow document if exists. */
|
||||
xDemoStatus = xPublishToTopic( &xMqttContext,
|
||||
SHADOW_TOPIC_STRING_DELETE( democonfigTHING_NAME ),
|
||||
SHADOW_TOPIC_LENGTH_DELETE( THING_NAME_LENGTH ),
|
||||
pcUpdateDocument,
|
||||
0U );
|
||||
}
|
||||
|
||||
/* Wait for an incoming publish on `/delete/accepted` or `/delete/rejected`
|
||||
* topics, if not already received a publish. */
|
||||
if( ( xDemoStatus == pdPASS ) && ( xDeleteResponseReceived != pdTRUE ) )
|
||||
{
|
||||
xDemoStatus = prvWaitForDeleteResponse( &xMqttContext );
|
||||
}
|
||||
|
||||
/* Unsubscribe from the `/delete/accepted` and 'delete/rejected` topics.*/
|
||||
if( xDemoStatus == pdPASS )
|
||||
{
|
||||
xDemoStatus = xUnsubscribeFromTopic( &xMqttContext,
|
||||
SHADOW_TOPIC_STRING_DELETE_ACCEPTED( democonfigTHING_NAME ),
|
||||
SHADOW_TOPIC_LENGTH_DELETE_ACCEPTED( THING_NAME_LENGTH ) );
|
||||
}
|
||||
|
||||
if( xDemoStatus == pdPASS )
|
||||
{
|
||||
xDemoStatus = xUnsubscribeFromTopic( &xMqttContext,
|
||||
SHADOW_TOPIC_STRING_DELETE_REJECTED( democonfigTHING_NAME ),
|
||||
SHADOW_TOPIC_LENGTH_DELETE_REJECTED( THING_NAME_LENGTH ) );
|
||||
}
|
||||
|
||||
/* Check if Shadow document delete was successful. A delete can be
|
||||
* successful in cases listed below.
|
||||
* 1. If an incoming publish is received on `/delete/accepted` topic.
|
||||
* 2. If an incoming publish is received on `/delete/rejected` topic
|
||||
* with error code 404. This indicates that a Shadow document was
|
||||
* not present for the Thing. */
|
||||
if( xShadowDeleted == pdFALSE )
|
||||
{
|
||||
LogError( ( "Shadow delete operation failed." ) );
|
||||
xDemoStatus = pdFAIL;
|
||||
}
|
||||
|
||||
/********************* Subscribe to Shadow topics. ************************/
|
||||
|
||||
/* Then try to subscribe the Shadow topics. */
|
||||
|
||||
if( xDemoStatus == pdPASS )
|
||||
{
|
||||
xDemoStatus = xSubscribeToTopic( &xMqttContext,
|
||||
SHADOW_TOPIC_STRING_UPDATE_DELTA( democonfigTHING_NAME ),
|
||||
SHADOW_TOPIC_LENGTH_UPDATE_DELTA( THING_NAME_LENGTH ) );
|
||||
}
|
||||
|
||||
if( xDemoStatus == pdPASS )
|
||||
{
|
||||
xDemoStatus = xSubscribeToTopic( &xMqttContext,
|
||||
SHADOW_TOPIC_STRING_UPDATE_ACCEPTED( democonfigTHING_NAME ),
|
||||
SHADOW_TOPIC_LENGTH_UPDATE_ACCEPTED( THING_NAME_LENGTH ) );
|
||||
}
|
||||
|
||||
if( xDemoStatus == pdPASS )
|
||||
{
|
||||
xDemoStatus = xSubscribeToTopic( &xMqttContext,
|
||||
SHADOW_TOPIC_STRING_UPDATE_REJECTED( democonfigTHING_NAME ),
|
||||
SHADOW_TOPIC_LENGTH_UPDATE_REJECTED( THING_NAME_LENGTH ) );
|
||||
}
|
||||
|
||||
/********************* Publish to Shadow topics. **********************/
|
||||
|
||||
/* This demo uses a constant #democonfigTHING_NAME known at compile time
|
||||
* therefore we can use macros to assemble shadow topic strings.
|
||||
* If the thing name is known at run time, then we could use the API
|
||||
* #Shadow_GetTopicString to assemble shadow topic strings, here is the
|
||||
* example for /update/delta:
|
||||
*
|
||||
* For /update/delta:
|
||||
*
|
||||
* #define SHADOW_TOPIC_MAX_LENGTH (256U)
|
||||
*
|
||||
* ShadowStatus_t shadowStatus = SHADOW_STATUS_SUCCESS;
|
||||
* char cTopicBuffer[ SHADOW_TOPIC_MAX_LENGTH ] = { 0 };
|
||||
* uint16_t usBufferSize = SHADOW_TOPIC_MAX_LENGTH;
|
||||
* uint16_t usOutLength = 0;
|
||||
* const char * pcThingName = "TestThingName";
|
||||
* uint16_t usThingNameLength = ( sizeof( pcThingName ) - 1U );
|
||||
*
|
||||
* shadowStatus = Shadow_GetTopicString( SHADOW_TOPIC_STRING_TYPE_UPDATE_DELTA,
|
||||
* pcThingName,
|
||||
* usThingNameLength,
|
||||
* & ( cTopicBuffer[ 0 ] ),
|
||||
* usBufferSize,
|
||||
* & usOutLength );
|
||||
*/
|
||||
|
||||
/* Then we publish a desired state to the /update topic. Since we've deleted
|
||||
* the device shadow at the beginning of the demo, this will cause a delta
|
||||
* message to be published, which we have subscribed to.
|
||||
* In many real applications, the desired state is not published by
|
||||
* the device itself. But for the purpose of making this demo self-contained,
|
||||
* we publish one here so that we can receive a delta message later.
|
||||
*/
|
||||
if( xDemoStatus == pdPASS )
|
||||
{
|
||||
/* Desired power on state . */
|
||||
LogInfo( ( "Send desired power state with 1." ) );
|
||||
|
||||
( void ) memset( pcUpdateDocument,
|
||||
0x00,
|
||||
sizeof( pcUpdateDocument ) );
|
||||
|
||||
/* Keep the client token in global variable used to compare if
|
||||
* the same token in /update/accepted. */
|
||||
ulClientToken = ( xTaskGetTickCount() % 1000000 );
|
||||
|
||||
snprintf( pcUpdateDocument,
|
||||
SHADOW_REPORTED_JSON_LENGTH + 1,
|
||||
SHADOW_REPORTED_JSON,
|
||||
( int ) ulCurrentPowerOnState,
|
||||
( long unsigned ) ulClientToken );
|
||||
SHADOW_DESIRED_JSON_LENGTH + 1,
|
||||
SHADOW_DESIRED_JSON,
|
||||
( int ) 1,
|
||||
( long unsigned ) ( xTaskGetTickCount() % 1000000 ) );
|
||||
|
||||
demoStatus = xPublishToTopic( &xMqttContext,
|
||||
SHADOW_TOPIC_STRING_UPDATE( democonfigTHING_NAME ),
|
||||
SHADOW_TOPIC_LENGTH_UPDATE( THING_NAME_LENGTH ),
|
||||
pcUpdateDocument,
|
||||
( SHADOW_DESIRED_JSON_LENGTH + 1 ) );
|
||||
xDemoStatus = xPublishToTopic( &xMqttContext,
|
||||
SHADOW_TOPIC_STRING_UPDATE( democonfigTHING_NAME ),
|
||||
SHADOW_TOPIC_LENGTH_UPDATE( THING_NAME_LENGTH ),
|
||||
pcUpdateDocument,
|
||||
( SHADOW_DESIRED_JSON_LENGTH + 1 ) );
|
||||
}
|
||||
|
||||
if( xDemoStatus == pdPASS )
|
||||
{
|
||||
/* Note that PublishToTopic already called MQTT_ProcessLoop,
|
||||
* therefore responses may have been received and the prvEventCallback
|
||||
* may have been called, which may have changed the stateChanged flag.
|
||||
* Check if the state change flag has been modified or not. If it's modified,
|
||||
* then we publish reported state to update topic.
|
||||
*/
|
||||
if( stateChanged == true )
|
||||
{
|
||||
/* Report the latest power state back to device shadow. */
|
||||
LogInfo( ( "Report to the state change: %d", ulCurrentPowerOnState ) );
|
||||
( void ) memset( pcUpdateDocument,
|
||||
0x00,
|
||||
sizeof( pcUpdateDocument ) );
|
||||
|
||||
/* Keep the client token in global variable used to compare if
|
||||
* the same token in /update/accepted. */
|
||||
ulClientToken = ( xTaskGetTickCount() % 1000000 );
|
||||
|
||||
snprintf( pcUpdateDocument,
|
||||
SHADOW_REPORTED_JSON_LENGTH + 1,
|
||||
SHADOW_REPORTED_JSON,
|
||||
( int ) ulCurrentPowerOnState,
|
||||
( long unsigned ) ulClientToken );
|
||||
|
||||
xDemoStatus = xPublishToTopic( &xMqttContext,
|
||||
SHADOW_TOPIC_STRING_UPDATE( democonfigTHING_NAME ),
|
||||
SHADOW_TOPIC_LENGTH_UPDATE( THING_NAME_LENGTH ),
|
||||
pcUpdateDocument,
|
||||
( SHADOW_DESIRED_JSON_LENGTH + 1 ) );
|
||||
}
|
||||
else
|
||||
{
|
||||
LogInfo( ( "No change from /update/delta, unsubscribe all shadow topics and disconnect from MQTT.\r\n" ) );
|
||||
}
|
||||
}
|
||||
|
||||
/****************** Unsubscribe from Shadow topics. *******************/
|
||||
|
||||
if( xDemoStatus == pdPASS )
|
||||
{
|
||||
LogInfo( ( "Start to unsubscribe shadow topics and disconnect from MQTT. \r\n" ) );
|
||||
|
||||
xDemoStatus = xUnsubscribeFromTopic( &xMqttContext,
|
||||
SHADOW_TOPIC_STRING_UPDATE_DELTA( democonfigTHING_NAME ),
|
||||
SHADOW_TOPIC_LENGTH_UPDATE_DELTA( THING_NAME_LENGTH ) );
|
||||
|
||||
if( xDemoStatus != pdPASS )
|
||||
{
|
||||
LogError( ( "Failed to unsubscribe the topic %s",
|
||||
SHADOW_TOPIC_STRING_UPDATE_DELTA( democonfigTHING_NAME ) ) );
|
||||
}
|
||||
}
|
||||
|
||||
if( xDemoStatus == pdPASS )
|
||||
{
|
||||
xDemoStatus = xUnsubscribeFromTopic( &xMqttContext,
|
||||
SHADOW_TOPIC_STRING_UPDATE_ACCEPTED( democonfigTHING_NAME ),
|
||||
SHADOW_TOPIC_LENGTH_UPDATE_ACCEPTED( THING_NAME_LENGTH ) );
|
||||
|
||||
if( xDemoStatus != pdPASS )
|
||||
{
|
||||
LogError( ( "Failed to unsubscribe the topic %s",
|
||||
SHADOW_TOPIC_STRING_UPDATE_ACCEPTED( democonfigTHING_NAME ) ) );
|
||||
}
|
||||
}
|
||||
|
||||
if( xDemoStatus == pdPASS )
|
||||
{
|
||||
xDemoStatus = xUnsubscribeFromTopic( &xMqttContext,
|
||||
SHADOW_TOPIC_STRING_UPDATE_REJECTED( democonfigTHING_NAME ),
|
||||
SHADOW_TOPIC_LENGTH_UPDATE_REJECTED( THING_NAME_LENGTH ) );
|
||||
|
||||
if( xDemoStatus != pdPASS )
|
||||
{
|
||||
LogError( ( "Failed to unsubscribe the topic %s",
|
||||
SHADOW_TOPIC_STRING_UPDATE_REJECTED( democonfigTHING_NAME ) ) );
|
||||
}
|
||||
}
|
||||
|
||||
/****************************** Disconnect. *******************************/
|
||||
|
||||
/* The MQTT session is always disconnected, even if there were prior failures. */
|
||||
xDemoStatus = xDisconnectMqttSession( &xMqttContext, &xNetworkContext );
|
||||
|
||||
/* This demo performs only Device Shadow operations. If matching the Shadow
|
||||
* MQTT topic fails or there are failure in parsing the received JSON document,
|
||||
* then this demo was not successful. */
|
||||
if( ( xUpdateAcceptedReturn != pdPASS ) || ( xUpdateDeltaReturn != pdPASS ) )
|
||||
{
|
||||
LogError( ( "Callback function failed." ) );
|
||||
}
|
||||
|
||||
/*********************** Retry in case of failure. ************************/
|
||||
|
||||
/* Increment the demo run count. */
|
||||
uxDemoRunCount++;
|
||||
|
||||
if( xDemoStatus == pdPASS )
|
||||
{
|
||||
LogInfo( ( "Demo iteration %lu is successful.", uxDemoRunCount ) );
|
||||
}
|
||||
/* Attempt to retry a failed iteration of demo for up to #SHADOW_MAX_DEMO_LOOP_COUNT times. */
|
||||
else if( uxDemoRunCount < SHADOW_MAX_DEMO_LOOP_COUNT )
|
||||
{
|
||||
LogWarn( ( "Demo iteration %lu failed. Retrying...", uxDemoRunCount ) );
|
||||
vTaskDelay( DELAY_BETWEEN_DEMO_RETRY_ITERATIONS_TICKS );
|
||||
}
|
||||
/* Failed all #SHADOW_MAX_DEMO_LOOP_COUNT demo iterations. */
|
||||
else
|
||||
{
|
||||
LogInfo( ( "No change from /update/delta, unsubscribe all shadow topics and disconnect from MQTT.\r\n" ) );
|
||||
LogError( ( "All %d demo iterations failed.", SHADOW_MAX_DEMO_LOOP_COUNT ) );
|
||||
break;
|
||||
}
|
||||
}
|
||||
} while( xDemoStatus != pdPASS );
|
||||
|
||||
/****************** Unsubscribe from Shadow topics. *******************/
|
||||
|
||||
if( demoStatus == pdPASS )
|
||||
{
|
||||
LogInfo( ( "Start to unsubscribe shadow topics and disconnect from MQTT. \r\n" ) );
|
||||
|
||||
demoStatus = xUnsubscribeFromTopic( &xMqttContext,
|
||||
SHADOW_TOPIC_STRING_UPDATE_DELTA( democonfigTHING_NAME ),
|
||||
SHADOW_TOPIC_LENGTH_UPDATE_DELTA( THING_NAME_LENGTH ) );
|
||||
|
||||
if( demoStatus != pdPASS )
|
||||
{
|
||||
LogError( ( "Failed to unsubscribe the topic %s",
|
||||
SHADOW_TOPIC_STRING_UPDATE_DELTA( democonfigTHING_NAME ) ) );
|
||||
}
|
||||
}
|
||||
|
||||
if( demoStatus == pdPASS )
|
||||
{
|
||||
demoStatus = xUnsubscribeFromTopic( &xMqttContext,
|
||||
SHADOW_TOPIC_STRING_UPDATE_ACCEPTED( democonfigTHING_NAME ),
|
||||
SHADOW_TOPIC_LENGTH_UPDATE_ACCEPTED( THING_NAME_LENGTH ) );
|
||||
|
||||
if( demoStatus != pdPASS )
|
||||
{
|
||||
LogError( ( "Failed to unsubscribe the topic %s",
|
||||
SHADOW_TOPIC_STRING_UPDATE_ACCEPTED( democonfigTHING_NAME ) ) );
|
||||
}
|
||||
}
|
||||
|
||||
if( demoStatus == pdPASS )
|
||||
{
|
||||
demoStatus = xUnsubscribeFromTopic( &xMqttContext,
|
||||
SHADOW_TOPIC_STRING_UPDATE_REJECTED( democonfigTHING_NAME ),
|
||||
SHADOW_TOPIC_LENGTH_UPDATE_REJECTED( THING_NAME_LENGTH ) );
|
||||
|
||||
if( demoStatus != pdPASS )
|
||||
{
|
||||
LogError( ( "Failed to unsubscribe the topic %s",
|
||||
SHADOW_TOPIC_STRING_UPDATE_REJECTED( democonfigTHING_NAME ) ) );
|
||||
}
|
||||
}
|
||||
|
||||
/****************************** Disconnect. *******************************/
|
||||
|
||||
/* The MQTT session is always disconnected, even if there were prior failures. */
|
||||
demoStatus = xDisconnectMqttSession( &xMqttContext, &xNetworkContext );
|
||||
|
||||
/* This demo performs only Device Shadow operations. If matching the Shadow
|
||||
* MQTT topic fails or there are failure in parsing the received JSON document,
|
||||
* then this demo was not successful. */
|
||||
if( ( xUpdateAcceptedReturn != pdPASS ) || ( xUpdateDeltaReturn != pdPASS ) )
|
||||
{
|
||||
LogError( ( "Callback function failed." ) );
|
||||
}
|
||||
|
||||
if( demoStatus == pdPASS )
|
||||
if( xDemoStatus == pdPASS )
|
||||
{
|
||||
LogInfo( ( "Demo completed successfully." ) );
|
||||
}
|
||||
|
@ -208,6 +208,22 @@
|
||||
*/
|
||||
#define MAKE_STATUS_REPORT( x ) "{\"status\":\"" x "\"}"
|
||||
|
||||
/**
|
||||
* @brief The maximum number of times to run the loop in this demo.
|
||||
*
|
||||
* @note The demo loop is attempted to re-run only if it fails in an iteration.
|
||||
* Once the demo loop succeeds in an iteration, the demo exits successfully.
|
||||
*/
|
||||
#ifndef JOBS_MAX_DEMO_LOOP_COUNT
|
||||
#define JOBS_MAX_DEMO_LOOP_COUNT ( 3 )
|
||||
#endif
|
||||
|
||||
/**
|
||||
* @brief Time in ticks to wait between retries of the demo loop if
|
||||
* demo loop fails.
|
||||
*/
|
||||
#define DELAY_BETWEEN_DEMO_RETRY_ITERATIONS_TICKS ( pdMS_TO_TICKS( 5000U ) )
|
||||
|
||||
/*-----------------------------------------------------------*/
|
||||
|
||||
/**
|
||||
@ -735,6 +751,8 @@ void vStartJobsDemo( void )
|
||||
void prvJobsDemoTask( void * pvParameters )
|
||||
{
|
||||
BaseType_t xDemoStatus = pdPASS;
|
||||
UBaseType_t uxDemoRunCount = 0UL;
|
||||
BaseType_t retryDemoLoop = pdFALSE;
|
||||
|
||||
/* Remove compiler warnings about unused parameters. */
|
||||
( void ) pvParameters;
|
||||
@ -742,119 +760,163 @@ void prvJobsDemoTask( void * pvParameters )
|
||||
/* Set the pParams member of the network context with desired transport. */
|
||||
xNetworkContext.pParams = &xTlsTransportParams;
|
||||
|
||||
/* Establish an MQTT connection with AWS IoT over a mutually authenticated TLS session. */
|
||||
xDemoStatus = xEstablishMqttSession( &xMqttContext,
|
||||
&xNetworkContext,
|
||||
&xBuffer,
|
||||
prvEventCallback );
|
||||
|
||||
if( xDemoStatus == pdFAIL )
|
||||
/* This demo runs a single loop unless there are failures in the demo execution.
|
||||
* In case of failures in the demo execution, demo loop will be retried for up to
|
||||
* JOBS_MAX_DEMO_LOOP_COUNT times. */
|
||||
do
|
||||
{
|
||||
/* Log error to indicate connection failure. */
|
||||
LogError( ( "Failed to connect to AWS IoT broker." ) );
|
||||
}
|
||||
else
|
||||
{
|
||||
/* Print out a short user guide to the console. The default logging
|
||||
* limit of 255 characters can be changed in demo_logging.c, but breaking
|
||||
* up the only instance of a 1000+ character string is more practical. */
|
||||
LogInfo( ( "\r\n"
|
||||
"/*-----------------------------------------------------------*/\r\n"
|
||||
"\r\n"
|
||||
"The Jobs demo is now ready to accept Jobs.\r\n"
|
||||
"Jobs may be created using the AWS IoT console or AWS CLI.\r\n"
|
||||
"See the following link for more information.\r\n" ) );
|
||||
LogInfo( ( "\r"
|
||||
"https://docs.aws.amazon.com/cli/latest/reference/iot/create-job.html\r\n"
|
||||
"\r\n"
|
||||
"This demo expects Job documents to have an \"action\" JSON key.\r\n"
|
||||
"The following actions are currently supported:\r\n" ) );
|
||||
LogInfo( ( "\r"
|
||||
" - print \r\n"
|
||||
" Logs a message to the local console. The Job document must also contain a \"message\".\r\n"
|
||||
" For example: { \"action\": \"print\", \"message\": \"Hello world!\"} will cause\r\n"
|
||||
" \"Hello world!\" to be printed on the console.\r\n" ) );
|
||||
LogInfo( ( "\r"
|
||||
" - publish \r\n"
|
||||
" Publishes a message to an MQTT topic. The Job document must also contain a \"message\" and \"topic\".\r\n" ) );
|
||||
LogInfo( ( "\r"
|
||||
" For example: { \"action\": \"publish\", \"topic\": \"demo/jobs\", \"message\": \"Hello world!\"} will cause\r\n"
|
||||
" \"Hello world!\" to be published to the topic \"demo/jobs\".\r\n" ) );
|
||||
LogInfo( ( "\r"
|
||||
" - exit \r\n"
|
||||
" Exits the demo program. This program will run until { \"action\": \"exit\" } is received.\r\n"
|
||||
"\r\n"
|
||||
"/*-----------------------------------------------------------*/\r\n" ) );
|
||||
/* Establish an MQTT connection with AWS IoT over a mutually authenticated TLS session. */
|
||||
xDemoStatus = xEstablishMqttSession( &xMqttContext,
|
||||
&xNetworkContext,
|
||||
&xBuffer,
|
||||
prvEventCallback );
|
||||
|
||||
/* Subscribe to the NextJobExecutionChanged API topic to receive notifications about the next pending
|
||||
* job in the queue for the Thing resource used by this demo. */
|
||||
if( xSubscribeToTopic( &xMqttContext,
|
||||
NEXT_JOB_EXECUTION_CHANGED_TOPIC( democonfigTHING_NAME ),
|
||||
sizeof( NEXT_JOB_EXECUTION_CHANGED_TOPIC( democonfigTHING_NAME ) ) - 1 ) != pdPASS )
|
||||
if( xDemoStatus == pdFAIL )
|
||||
{
|
||||
/* Log error to indicate connection failure. */
|
||||
LogError( ( "Failed to connect to AWS IoT broker." ) );
|
||||
}
|
||||
else
|
||||
{
|
||||
/* Print out a short user guide to the console. The default logging
|
||||
* limit of 255 characters can be changed in demo_logging.c, but breaking
|
||||
* up the only instance of a 1000+ character string is more practical. */
|
||||
LogInfo( ( "\r\n"
|
||||
"/*-----------------------------------------------------------*/\r\n"
|
||||
"\r\n"
|
||||
"The Jobs demo is now ready to accept Jobs.\r\n"
|
||||
"Jobs may be created using the AWS IoT console or AWS CLI.\r\n"
|
||||
"See the following link for more information.\r\n" ) );
|
||||
LogInfo( ( "\r"
|
||||
"https://docs.aws.amazon.com/cli/latest/reference/iot/create-job.html\r\n"
|
||||
"\r\n"
|
||||
"This demo expects Job documents to have an \"action\" JSON key.\r\n"
|
||||
"The following actions are currently supported:\r\n" ) );
|
||||
LogInfo( ( "\r"
|
||||
" - print \r\n"
|
||||
" Logs a message to the local console. The Job document must also contain a \"message\".\r\n"
|
||||
" For example: { \"action\": \"print\", \"message\": \"Hello world!\"} will cause\r\n"
|
||||
" \"Hello world!\" to be printed on the console.\r\n" ) );
|
||||
LogInfo( ( "\r"
|
||||
" - publish \r\n"
|
||||
" Publishes a message to an MQTT topic. The Job document must also contain a \"message\" and \"topic\".\r\n" ) );
|
||||
LogInfo( ( "\r"
|
||||
" For example: { \"action\": \"publish\", \"topic\": \"demo/jobs\", \"message\": \"Hello world!\"} will cause\r\n"
|
||||
" \"Hello world!\" to be published to the topic \"demo/jobs\".\r\n" ) );
|
||||
LogInfo( ( "\r"
|
||||
" - exit \r\n"
|
||||
" Exits the demo program. This program will run until { \"action\": \"exit\" } is received.\r\n"
|
||||
"\r\n"
|
||||
"/*-----------------------------------------------------------*/\r\n" ) );
|
||||
|
||||
/* Subscribe to the NextJobExecutionChanged API topic to receive notifications about the next pending
|
||||
* job in the queue for the Thing resource used by this demo. */
|
||||
if( xSubscribeToTopic( &xMqttContext,
|
||||
NEXT_JOB_EXECUTION_CHANGED_TOPIC( democonfigTHING_NAME ),
|
||||
sizeof( NEXT_JOB_EXECUTION_CHANGED_TOPIC( democonfigTHING_NAME ) ) - 1 ) != pdPASS )
|
||||
{
|
||||
xDemoStatus = pdFAIL;
|
||||
LogError( ( "Failed to subscribe to NextJobExecutionChanged API of AWS IoT Jobs service: Topic=%s",
|
||||
NEXT_JOB_EXECUTION_CHANGED_TOPIC( democonfigTHING_NAME ) ) );
|
||||
}
|
||||
}
|
||||
|
||||
if( xDemoStatus == pdPASS )
|
||||
{
|
||||
/* Publish to AWS IoT Jobs on the StartNextPendingJobExecution API to request the next pending job.
|
||||
*
|
||||
* Note: It is not required to make MQTT subscriptions to the response topics of the
|
||||
* StartNextPendingJobExecution API because the AWS IoT Jobs service sends responses for
|
||||
* the PUBLISH commands on the same MQTT connection irrespective of whether the client has subscribed
|
||||
* to the response topics or not.
|
||||
* This demo processes incoming messages from the response topics of the API in the prvEventCallback()
|
||||
* handler that is supplied to the coreMQTT library. */
|
||||
if( xPublishToTopic( &xMqttContext,
|
||||
START_NEXT_JOB_TOPIC( democonfigTHING_NAME ),
|
||||
sizeof( START_NEXT_JOB_TOPIC( democonfigTHING_NAME ) ) - 1,
|
||||
NULL,
|
||||
0 ) != pdPASS )
|
||||
{
|
||||
xDemoStatus = pdFAIL;
|
||||
LogError( ( "Failed to publish to StartNextPendingJobExecution API of AWS IoT Jobs service: "
|
||||
"Topic=%s", START_NEXT_JOB_TOPIC( democonfigTHING_NAME ) ) );
|
||||
}
|
||||
}
|
||||
|
||||
/* Keep on running the demo until we receive a job for the "exit" action to exit the demo. */
|
||||
while( ( xExitActionJobReceived == pdFALSE ) &&
|
||||
( xDemoEncounteredError == pdFALSE ) &&
|
||||
( xDemoStatus == pdPASS ) )
|
||||
{
|
||||
MQTTStatus_t xMqttStatus = MQTTSuccess;
|
||||
|
||||
/* Check if we have notification for the next pending job in the queue from the
|
||||
* NextJobExecutionChanged API of the AWS IoT Jobs service. */
|
||||
xMqttStatus = MQTT_ProcessLoop( &xMqttContext, 300U );
|
||||
|
||||
if( xMqttStatus != MQTTSuccess )
|
||||
{
|
||||
xDemoStatus = pdFAIL;
|
||||
LogError( ( "Failed to receive notification about next pending job: "
|
||||
"MQTT_ProcessLoop failed" ) );
|
||||
}
|
||||
}
|
||||
|
||||
/* Increment the demo run count. */
|
||||
uxDemoRunCount++;
|
||||
|
||||
/* Retry demo loop only if there is a failure before completing
|
||||
* the processing of any pending jobs. Any failure in MQTT unsubscribe
|
||||
* or disconnect is considered a failure in demo execution and retry
|
||||
* will not be attempted since a retry without any pending jobs will
|
||||
* make this demo indefinitely wait. */
|
||||
if( ( xDemoStatus == pdFAIL ) || ( xDemoEncounteredError == pdTRUE ) )
|
||||
{
|
||||
if( uxDemoRunCount < JOBS_MAX_DEMO_LOOP_COUNT )
|
||||
{
|
||||
LogWarn( ( "Demo iteration %lu failed. Retrying...", uxDemoRunCount ) );
|
||||
retryDemoLoop = pdTRUE;
|
||||
}
|
||||
else
|
||||
{
|
||||
LogError( ( "All %d demo iterations failed.", JOBS_MAX_DEMO_LOOP_COUNT ) );
|
||||
retryDemoLoop = pdFALSE;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/* Reset the flag for demo retry. */
|
||||
retryDemoLoop = pdFALSE;
|
||||
}
|
||||
|
||||
/* Unsubscribe from the NextJobExecutionChanged API topic. */
|
||||
if( xUnsubscribeFromTopic( &xMqttContext,
|
||||
NEXT_JOB_EXECUTION_CHANGED_TOPIC( democonfigTHING_NAME ),
|
||||
sizeof( NEXT_JOB_EXECUTION_CHANGED_TOPIC( democonfigTHING_NAME ) ) - 1 ) != pdPASS )
|
||||
{
|
||||
xDemoStatus = pdFAIL;
|
||||
LogError( ( "Failed to subscribe to NextJobExecutionChanged API of AWS IoT Jobs service: Topic=%s",
|
||||
NEXT_JOB_EXECUTION_CHANGED_TOPIC( democonfigTHING_NAME ) ) );
|
||||
LogError( ( "Failed to subscribe unsubscribe from the NextJobExecutionChanged API of AWS IoT Jobs service: "
|
||||
"Topic=%s", NEXT_JOB_EXECUTION_CHANGED_TOPIC( democonfigTHING_NAME ) ) );
|
||||
}
|
||||
}
|
||||
|
||||
if( xDemoStatus == pdPASS )
|
||||
{
|
||||
/* Publish to AWS IoT Jobs on the StartNextPendingJobExecution API to request the next pending job.
|
||||
*
|
||||
* Note: It is not required to make MQTT subscriptions to the response topics of the
|
||||
* StartNextPendingJobExecution API because the AWS IoT Jobs service sends responses for
|
||||
* the PUBLISH commands on the same MQTT connection irrespective of whether the client has subscribed
|
||||
* to the response topics or not.
|
||||
* This demo processes incoming messages from the response topics of the API in the prvEventCallback()
|
||||
* handler that is supplied to the coreMQTT library. */
|
||||
if( xPublishToTopic( &xMqttContext,
|
||||
START_NEXT_JOB_TOPIC( democonfigTHING_NAME ),
|
||||
sizeof( START_NEXT_JOB_TOPIC( democonfigTHING_NAME ) ) - 1,
|
||||
NULL,
|
||||
0 ) != pdPASS )
|
||||
/* Disconnect the MQTT and network connections with AWS IoT. */
|
||||
if( xDisconnectMqttSession( &xMqttContext, &xNetworkContext ) != pdPASS )
|
||||
{
|
||||
xDemoStatus = pdFAIL;
|
||||
LogError( ( "Failed to publish to StartNextPendingJobExecution API of AWS IoT Jobs service: "
|
||||
"Topic=%s", START_NEXT_JOB_TOPIC( democonfigTHING_NAME ) ) );
|
||||
LogError( ( "Disconnection from AWS IoT failed..." ) );
|
||||
}
|
||||
}
|
||||
|
||||
/* Keep on running the demo until we receive a job for the "exit" action to exit the demo. */
|
||||
while( ( xExitActionJobReceived == pdFALSE ) &&
|
||||
( xDemoEncounteredError == pdFALSE ) &&
|
||||
( xDemoStatus == pdPASS ) )
|
||||
{
|
||||
MQTTStatus_t xMqttStatus = MQTTSuccess;
|
||||
|
||||
/* Check if we have notification for the next pending job in the queue from the
|
||||
* NextJobExecutionChanged API of the AWS IoT Jobs service. */
|
||||
xMqttStatus = MQTT_ProcessLoop( &xMqttContext, 300U );
|
||||
|
||||
if( xMqttStatus != MQTTSuccess )
|
||||
/* Add a delay if a retry is required. */
|
||||
if( retryDemoLoop == pdTRUE )
|
||||
{
|
||||
xDemoStatus = pdFAIL;
|
||||
LogError( ( "Failed to receive notification about next pending job: "
|
||||
"MQTT_ProcessLoop failed" ) );
|
||||
/* Clear the flag that indicates that indicates the demo error
|
||||
* before attempting a retry. */
|
||||
xDemoEncounteredError = pdFALSE;
|
||||
|
||||
LogInfo( ( "A short delay before the next demo iteration." ) );
|
||||
vTaskDelay( DELAY_BETWEEN_DEMO_RETRY_ITERATIONS_TICKS );
|
||||
}
|
||||
}
|
||||
|
||||
/* Unsubscribe from the NextJobExecutionChanged API topic. */
|
||||
if( xUnsubscribeFromTopic( &xMqttContext,
|
||||
NEXT_JOB_EXECUTION_CHANGED_TOPIC( democonfigTHING_NAME ),
|
||||
sizeof( NEXT_JOB_EXECUTION_CHANGED_TOPIC( democonfigTHING_NAME ) ) - 1 ) != pdPASS )
|
||||
{
|
||||
xDemoStatus == pdFAIL;
|
||||
LogError( ( "Failed to subscribe unsubscribe from the NextJobExecutionChanged API of AWS IoT Jobs service: "
|
||||
"Topic=%s", NEXT_JOB_EXECUTION_CHANGED_TOPIC( democonfigTHING_NAME ) ) );
|
||||
}
|
||||
|
||||
/* Disconnect the MQTT and network connections with AWS IoT. */
|
||||
if( xDisconnectMqttSession( &xMqttContext, &xNetworkContext ) != pdPASS )
|
||||
{
|
||||
xDemoStatus == pdFAIL;
|
||||
LogError( ( "Disconnection from AWS Iot failed..." ) );
|
||||
}
|
||||
} while( retryDemoLoop == pdTRUE );
|
||||
|
||||
if( ( xDemoEncounteredError == pdFALSE ) && ( xDemoStatus == pdPASS ) )
|
||||
{
|
||||
|
@ -128,6 +128,22 @@ struct NetworkContext
|
||||
TlsTransportParams_t * pParams;
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief The maximum number of times to run the loop in this demo.
|
||||
*
|
||||
* @note The demo loop is attempted to re-run only if it fails in an iteration.
|
||||
* Once the demo loop succeeds in an iteration, the demo exits successfully.
|
||||
*/
|
||||
#ifndef HTTP_MAX_DEMO_LOOP_COUNT
|
||||
#define HTTP_MAX_DEMO_LOOP_COUNT ( 3 )
|
||||
#endif
|
||||
|
||||
/**
|
||||
* @brief Time in ticks to wait between retries of the demo loop if
|
||||
* demo loop fails.
|
||||
*/
|
||||
#define DELAY_BETWEEN_DEMO_RETRY_ITERATIONS_TICKS ( pdMS_TO_TICKS( 5000U ) )
|
||||
|
||||
/**
|
||||
* @brief A buffer used in the demo for storing HTTP request headers and
|
||||
* HTTP response headers and body.
|
||||
@ -215,6 +231,7 @@ static void prvHTTPDemoTask( void * pvParameters )
|
||||
NetworkContext_t xNetworkContext = { 0 };
|
||||
TlsTransportParams_t xTlsTransportParams = { 0 };
|
||||
BaseType_t xIsConnectionEstablished = pdFALSE;
|
||||
UBaseType_t uxDemoRunCount = 0UL;
|
||||
|
||||
/* The user of this demo must check the logs for any failure codes. */
|
||||
BaseType_t xDemoStatus = pdPASS;
|
||||
@ -225,55 +242,83 @@ static void prvHTTPDemoTask( void * pvParameters )
|
||||
/* Set the pParams member of the network context with desired transport. */
|
||||
xNetworkContext.pParams = &xTlsTransportParams;
|
||||
|
||||
/**************************** Connect. ******************************/
|
||||
|
||||
/* Attempt to connect to the HTTP server. If connection fails, retry after a
|
||||
* timeout. The timeout value will be exponentially increased until either the
|
||||
* maximum number of attempts or the maximum timeout value is reached. The
|
||||
* function returns pdFAIL if the TCP connection cannot be established with
|
||||
* the broker after configured number of attempts. */
|
||||
xDemoStatus = connectToServerWithBackoffRetries( prvConnectToServer,
|
||||
&xNetworkContext );
|
||||
|
||||
if( xDemoStatus == pdPASS )
|
||||
/* This demo runs a single loop unless there are failures in the demo execution.
|
||||
* In case of failures in the demo execution, demo loop will be retried for up to
|
||||
* HTTP_MAX_DEMO_LOOP_COUNT times. */
|
||||
do
|
||||
{
|
||||
/* Set a flag indicating that a TLS connection exists. */
|
||||
xIsConnectionEstablished = pdTRUE;
|
||||
/**************************** Connect. ******************************/
|
||||
|
||||
/* Define the transport interface. */
|
||||
xTransportInterface.pNetworkContext = &xNetworkContext;
|
||||
xTransportInterface.send = TLS_FreeRTOS_send;
|
||||
xTransportInterface.recv = TLS_FreeRTOS_recv;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* Log error to indicate connection failure after all
|
||||
* reconnect attempts are over. */
|
||||
LogError( ( "Failed to connect to HTTP server %.*s.",
|
||||
( int32_t ) AWS_IOT_ENDPOINT_LENGTH,
|
||||
democonfigAWS_IOT_ENDPOINT ) );
|
||||
}
|
||||
/* Attempt to connect to the HTTP server. If connection fails, retry after a
|
||||
* timeout. The timeout value will be exponentially increased until either the
|
||||
* maximum number of attempts or the maximum timeout value is reached. The
|
||||
* function returns pdFAIL if the TCP connection cannot be established with
|
||||
* the broker after configured number of attempts. */
|
||||
xDemoStatus = connectToServerWithBackoffRetries( prvConnectToServer,
|
||||
&xNetworkContext );
|
||||
|
||||
/*********************** Send HTTP request.************************/
|
||||
if( xDemoStatus == pdPASS )
|
||||
{
|
||||
/* Set a flag indicating that a TLS connection exists. */
|
||||
xIsConnectionEstablished = pdTRUE;
|
||||
|
||||
if( xDemoStatus == pdPASS )
|
||||
{
|
||||
xDemoStatus = prvSendHttpRequest( &xTransportInterface,
|
||||
HTTP_METHOD_POST,
|
||||
( sizeof( HTTP_METHOD_POST ) - 1 ),
|
||||
democonfigPOST_PATH,
|
||||
( sizeof( democonfigPOST_PATH ) - 1 ) );
|
||||
}
|
||||
/* Define the transport interface. */
|
||||
xTransportInterface.pNetworkContext = &xNetworkContext;
|
||||
xTransportInterface.send = TLS_FreeRTOS_send;
|
||||
xTransportInterface.recv = TLS_FreeRTOS_recv;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* Log error to indicate connection failure after all
|
||||
* reconnect attempts are over. */
|
||||
LogError( ( "Failed to connect to HTTP server %.*s.",
|
||||
( int32_t ) AWS_IOT_ENDPOINT_LENGTH,
|
||||
democonfigAWS_IOT_ENDPOINT ) );
|
||||
}
|
||||
|
||||
/**************************** Disconnect. ******************************/
|
||||
/*********************** Send HTTP request.************************/
|
||||
|
||||
/* Close the network connection to clean up any system resources that the
|
||||
* demo may have consumed. */
|
||||
if( xIsConnectionEstablished == pdTRUE )
|
||||
{
|
||||
/* Close the network connection. */
|
||||
TLS_FreeRTOS_Disconnect( &xNetworkContext );
|
||||
}
|
||||
if( xDemoStatus == pdPASS )
|
||||
{
|
||||
xDemoStatus = prvSendHttpRequest( &xTransportInterface,
|
||||
HTTP_METHOD_POST,
|
||||
( sizeof( HTTP_METHOD_POST ) - 1 ),
|
||||
democonfigPOST_PATH,
|
||||
( sizeof( democonfigPOST_PATH ) - 1 ) );
|
||||
}
|
||||
|
||||
/**************************** Disconnect. ******************************/
|
||||
|
||||
/* Close the network connection to clean up any system resources that the
|
||||
* demo may have consumed. */
|
||||
if( xIsConnectionEstablished == pdTRUE )
|
||||
{
|
||||
/* Close the network connection. */
|
||||
TLS_FreeRTOS_Disconnect( &xNetworkContext );
|
||||
}
|
||||
|
||||
/*********************** Retry in case of failure. ************************/
|
||||
|
||||
/* Increment the demo run count. */
|
||||
uxDemoRunCount++;
|
||||
|
||||
if( xDemoStatus == pdPASS )
|
||||
{
|
||||
LogInfo( ( "Demo iteration %lu was successful.", uxDemoRunCount ) );
|
||||
}
|
||||
/* Attempt to retry a failed demo iteration for up to #HTTP_MAX_DEMO_LOOP_COUNT times. */
|
||||
else if( uxDemoRunCount < HTTP_MAX_DEMO_LOOP_COUNT )
|
||||
{
|
||||
LogWarn( ( "Demo iteration %lu failed. Retrying...", uxDemoRunCount ) );
|
||||
vTaskDelay( DELAY_BETWEEN_DEMO_RETRY_ITERATIONS_TICKS );
|
||||
}
|
||||
/* Failed all #HTTP_MAX_DEMO_LOOP_COUNT demo iterations. */
|
||||
else
|
||||
{
|
||||
LogError( ( "All %d demo iterations failed.", HTTP_MAX_DEMO_LOOP_COUNT ) );
|
||||
break;
|
||||
}
|
||||
} while( xDemoStatus != pdPASS );
|
||||
|
||||
if( xDemoStatus == pdPASS )
|
||||
{
|
||||
|
@ -170,6 +170,22 @@ struct NetworkContext
|
||||
PlaintextTransportParams_t * pParams;
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief The maximum number of times to run the loop in this demo.
|
||||
*
|
||||
* @note The demo loop is attempted to re-run only if it fails in an iteration.
|
||||
* Once the demo loop succeeds in an iteration, the demo exits successfully.
|
||||
*/
|
||||
#ifndef HTTP_MAX_DEMO_LOOP_COUNT
|
||||
#define HTTP_MAX_DEMO_LOOP_COUNT ( 3 )
|
||||
#endif
|
||||
|
||||
/**
|
||||
* @brief Time in ticks to wait between retries of the demo loop if
|
||||
* demo loop fails.
|
||||
*/
|
||||
#define DELAY_BETWEEN_DEMO_RETRY_ITERATIONS_TICKS ( pdMS_TO_TICKS( 5000U ) )
|
||||
|
||||
/**
|
||||
* @brief A pair containing a path string of the URI and its length.
|
||||
*/
|
||||
@ -294,6 +310,7 @@ static void prvHTTPDemoTask( void * pvParameters )
|
||||
};
|
||||
BaseType_t xIsConnectionEstablished = pdFALSE;
|
||||
UBaseType_t uxHttpPathCount = 0U;
|
||||
UBaseType_t uxDemoRunCount = 0UL;
|
||||
|
||||
/* The user of this demo must check the logs for any failure codes. */
|
||||
BaseType_t xDemoStatus = pdPASS;
|
||||
@ -304,62 +321,90 @@ static void prvHTTPDemoTask( void * pvParameters )
|
||||
/* Set the pParams member of the network context with desired transport. */
|
||||
xNetworkContext.pParams = &xPlaintextTransportParams;
|
||||
|
||||
/**************************** Connect. ******************************/
|
||||
|
||||
/* Attempt to connect to the HTTP server. If connection fails, retry after a
|
||||
* timeout. The timeout value will be exponentially increased until either the
|
||||
* maximum number of attempts or the maximum timeout value is reached. The
|
||||
* function returns pdFAIL if the TCP connection cannot be established with
|
||||
* the broker after configured number of attempts. */
|
||||
xDemoStatus = connectToServerWithBackoffRetries( prvConnectToServer,
|
||||
&xNetworkContext );
|
||||
|
||||
if( xDemoStatus == pdPASS )
|
||||
/* This demo runs a single loop unless there are failures in the demo execution.
|
||||
* In case of failures in the demo execution, demo loop will be retried for up to
|
||||
* HTTP_MAX_DEMO_LOOP_COUNT times. */
|
||||
do
|
||||
{
|
||||
/* Set a flag indicating that a TCP connection has been established. */
|
||||
xIsConnectionEstablished = pdTRUE;
|
||||
/**************************** Connect. ******************************/
|
||||
|
||||
/* Define the transport interface. */
|
||||
xTransportInterface.pNetworkContext = &xNetworkContext;
|
||||
xTransportInterface.send = Plaintext_FreeRTOS_send;
|
||||
xTransportInterface.recv = Plaintext_FreeRTOS_recv;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* Log error to indicate connection failure after all
|
||||
* reconnect attempts are over. */
|
||||
LogError( ( "Failed to connect to HTTP server %.*s.",
|
||||
( int32_t ) httpexampleSERVER_HOSTNAME_LENGTH,
|
||||
democonfigSERVER_HOSTNAME ) );
|
||||
}
|
||||
/* Attempt to connect to the HTTP server. If connection fails, retry after a
|
||||
* timeout. The timeout value will be exponentially increased until either the
|
||||
* maximum number of attempts or the maximum timeout value is reached. The
|
||||
* function returns pdFAIL if the TCP connection cannot be established with
|
||||
* the broker after configured number of attempts. */
|
||||
xDemoStatus = connectToServerWithBackoffRetries( prvConnectToServer,
|
||||
&xNetworkContext );
|
||||
|
||||
/*********************** Send HTTP request.************************/
|
||||
|
||||
for( uxHttpPathCount = 0; uxHttpPathCount < httpexampleNUMBER_HTTP_PATHS; ++uxHttpPathCount )
|
||||
{
|
||||
if( xDemoStatus == pdPASS )
|
||||
{
|
||||
xDemoStatus = prvSendHttpRequest( &xTransportInterface,
|
||||
xHttpMethods[ uxHttpPathCount ].pcHttpMethod,
|
||||
xHttpMethods[ uxHttpPathCount ].ulHttpMethodLength,
|
||||
xHttpMethodPaths[ uxHttpPathCount ].pcHttpPath,
|
||||
xHttpMethodPaths[ uxHttpPathCount ].ulHttpPathLength );
|
||||
/* Set a flag indicating that a TCP connection has been established. */
|
||||
xIsConnectionEstablished = pdTRUE;
|
||||
|
||||
/* Define the transport interface. */
|
||||
xTransportInterface.pNetworkContext = &xNetworkContext;
|
||||
xTransportInterface.send = Plaintext_FreeRTOS_send;
|
||||
xTransportInterface.recv = Plaintext_FreeRTOS_recv;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* Log error to indicate connection failure after all
|
||||
* reconnect attempts are over. */
|
||||
LogError( ( "Failed to connect to HTTP server %.*s.",
|
||||
( int32_t ) httpexampleSERVER_HOSTNAME_LENGTH,
|
||||
democonfigSERVER_HOSTNAME ) );
|
||||
}
|
||||
|
||||
/*********************** Send HTTP request.************************/
|
||||
|
||||
for( uxHttpPathCount = 0; uxHttpPathCount < httpexampleNUMBER_HTTP_PATHS; ++uxHttpPathCount )
|
||||
{
|
||||
if( xDemoStatus == pdPASS )
|
||||
{
|
||||
xDemoStatus = prvSendHttpRequest( &xTransportInterface,
|
||||
xHttpMethods[ uxHttpPathCount ].pcHttpMethod,
|
||||
xHttpMethods[ uxHttpPathCount ].ulHttpMethodLength,
|
||||
xHttpMethodPaths[ uxHttpPathCount ].pcHttpPath,
|
||||
xHttpMethodPaths[ uxHttpPathCount ].ulHttpPathLength );
|
||||
}
|
||||
else
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/**************************** Disconnect. ******************************/
|
||||
|
||||
/* Close the network connection to clean up any system resources that the
|
||||
* demo may have consumed. */
|
||||
if( xIsConnectionEstablished == pdTRUE )
|
||||
{
|
||||
/* Close the network connection. */
|
||||
Plaintext_FreeRTOS_Disconnect( &xNetworkContext );
|
||||
}
|
||||
|
||||
/*********************** Retry in case of failure. ************************/
|
||||
|
||||
/* Increment the demo run count. */
|
||||
uxDemoRunCount++;
|
||||
|
||||
if( xDemoStatus == pdPASS )
|
||||
{
|
||||
LogInfo( ( "Demo iteration %lu was successful.", uxDemoRunCount ) );
|
||||
}
|
||||
/* Attempt to retry a failed demo iteration for up to #HTTP_MAX_DEMO_LOOP_COUNT times. */
|
||||
else if( uxDemoRunCount < HTTP_MAX_DEMO_LOOP_COUNT )
|
||||
{
|
||||
LogWarn( ( "Demo iteration %lu failed. Retrying...", uxDemoRunCount ) );
|
||||
vTaskDelay( DELAY_BETWEEN_DEMO_RETRY_ITERATIONS_TICKS );
|
||||
}
|
||||
/* Failed all #HTTP_MAX_DEMO_LOOP_COUNT demo iterations. */
|
||||
else
|
||||
{
|
||||
LogError( ( "All %d demo iterations failed.", HTTP_MAX_DEMO_LOOP_COUNT ) );
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/**************************** Disconnect. ******************************/
|
||||
|
||||
/* Close the network connection to clean up any system resources that the
|
||||
* demo may have consumed. */
|
||||
if( xIsConnectionEstablished == pdTRUE )
|
||||
{
|
||||
/* Close the network connection. */
|
||||
Plaintext_FreeRTOS_Disconnect( &xNetworkContext );
|
||||
}
|
||||
} while( xDemoStatus != pdPASS );
|
||||
|
||||
if( xDemoStatus == pdPASS )
|
||||
{
|
||||
|
Submodule FreeRTOS-Plus/Source/coreJSON updated: 5bfadf5f86...c3ed2a23a6
Reference in New Issue
Block a user