tools(unit-test-app): Modify for ESP8266

1. Remove unused or unsupported unit test
2. Add extra header file to pass compiling
3. Remove unsupported functions
This commit is contained in:
dongheng
2019-03-18 13:04:01 +08:00
parent 6889537951
commit b522e9a0e1
38 changed files with 215 additions and 3430 deletions

View File

@ -1,4 +0,0 @@
#
#Component Makefile
#
COMPONENT_ADD_LDFLAGS = -Wl,--whole-archive -l$(COMPONENT_NAME) -Wl,--no-whole-archive

View File

@ -1,94 +0,0 @@
/*
* Tests for bootloader_support esp_load(ESP_IMAGE_VERIFY, ...)
*/
#include <esp_types.h>
#include <stdio.h>
#include "string.h"
#include "rom/ets_sys.h"
#include "freertos/FreeRTOS.h"
#include "freertos/task.h"
#include "freertos/semphr.h"
#include "freertos/queue.h"
#include "freertos/xtensa_api.h"
#include "unity.h"
#include "bootloader_common.h"
#include "esp_partition.h"
#include "esp_ota_ops.h"
#include "esp_image_format.h"
TEST_CASE("Verify bootloader image in flash", "[bootloader_support]")
{
const esp_partition_pos_t fake_bootloader_partition = {
.offset = ESP_BOOTLOADER_OFFSET,
.size = ESP_PARTITION_TABLE_OFFSET - ESP_BOOTLOADER_OFFSET,
};
esp_image_metadata_t data = { 0 };
TEST_ASSERT_EQUAL_HEX(ESP_OK, esp_image_load(ESP_IMAGE_VERIFY, &fake_bootloader_partition, &data));
TEST_ASSERT_NOT_EQUAL(0, data.image_len);
uint32_t bootloader_length = 0;
TEST_ASSERT_EQUAL_HEX(ESP_OK, esp_image_verify_bootloader(&bootloader_length));
TEST_ASSERT_EQUAL(data.image_len, bootloader_length);
}
TEST_CASE("Verify unit test app image", "[bootloader_support]")
{
esp_image_metadata_t data = { 0 };
const esp_partition_t *running = esp_ota_get_running_partition();
TEST_ASSERT_NOT_EQUAL(NULL, running);
const esp_partition_pos_t running_pos = {
.offset = running->address,
.size = running->size,
};
TEST_ASSERT_EQUAL_HEX(ESP_OK, esp_image_load(ESP_IMAGE_VERIFY, &running_pos, &data));
TEST_ASSERT_NOT_EQUAL(0, data.image_len);
TEST_ASSERT_TRUE(data.image_len <= running->size);
}
void check_label_search (int num_test, const char *list, const char *t_label, bool result)
{
// gen_esp32part.py trims up to 16 characters
// and the string may not have a null terminal symbol.
// below is cutting as it does the generator.
char label[16 + 1] = {0};
strncpy(label, t_label, sizeof(label) - 1);
bool ret = bootloader_common_label_search(list, label);
if (ret != result) {
printf("%d) %s | %s \n", num_test, list, label);
}
TEST_ASSERT_MESSAGE(ret == result, "Test failed");
}
TEST_CASE("Test label_search", "[bootloader_support]")
{
TEST_ASSERT_FALSE(bootloader_common_label_search(NULL, NULL));
TEST_ASSERT_FALSE(bootloader_common_label_search("nvs", NULL));
check_label_search(1, "nvs", "nvs", true);
check_label_search(2, "nvs, ", "nvs", true);
check_label_search(3, "nvs1", "nvs", false);
check_label_search(3, "nvs1, ", "nvs", false);
check_label_search(4, "nvs1nvs1, phy", "nvs1", false);
check_label_search(5, "nvs1, nvs1, phy", "nvs1", true);
check_label_search(6, "nvs12, nvs12, phy", "nvs1", false);
check_label_search(7, "nvs12, nvs1, phy", "nvs1", true);
check_label_search(8, "nvs12, nvs3, phy, nvs1","nvs1", true);
check_label_search(9, "nvs1nvs1, phy, nvs", "nvs", true);
check_label_search(10, "nvs1nvs1, phy, nvs1", "nvs", false);
check_label_search(11, "nvs1, nvs, phy, nvs1", "nvs", true);
check_label_search(12, "nvs1, nvs2, phy, nvs","nvs", true);
check_label_search(13, "ota_data, backup_nvs", "nvs", false);
check_label_search(14, "nvs1, nvs2, ota, nvs", "vs1", false);
check_label_search(20, "12345678901234, phy, nvs1", "12345678901234", true);
check_label_search(21, "123456789012345, phy, nvs1", "123456789012345", true);
check_label_search(22, "1234567890123456, phy, nvs1", "1234567890123456", true);
check_label_search(23, "12345678901234567, phy, nvs1", "12345678901234567", false);
check_label_search(24, "1234567890123456, phy, nvs1", "12345678901234567", true);
check_label_search(25, "phy, 1234567890123456, nvs1", "12345678901234567", true);
}

View File

@ -0,0 +1,25 @@
// Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
/**
* @brief Return current CPU clock frequency
* When frequency switching is performed, this frequency may change.
* However it is guaranteed that the frequency never changes with a critical
* section.
*
* @return CPU clock frequency, in Hz
*/
int esp_clk_cpu_freq(void);

View File

@ -43,6 +43,14 @@ extern "C" {
#define ETS_WDT_INUM 8
#define ETS_FRC_TIMER1_INUM 9
typedef enum {
OK = 0,
FAIL,
PENDING,
BUSY,
CANCEL,
} STATUS;
extern char NMIIrqIsOn;
extern uint32_t WDEV_INTEREST_EVENT;

View File

@ -0,0 +1,127 @@
// Copyright 2010-2016 Espressif Systems (Shanghai) PTE LTD
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef _ROM_UART_H_
#define _ROM_UART_H_
#include "esp_types.h"
#include "esp_attr.h"
#include "ets_sys.h"
#include "esp8266/uart_struct.h"
#include "esp8266/uart_register.h"
#include "esp8266/pin_mux_register.h"
#include "esp8266/eagle_soc.h"
#include "esp8266/rom_functions.h"
#include "driver/soc.h"
#ifdef __cplusplus
extern "C" {
#endif
/** \defgroup uart_apis, uart configuration and communication related apis
* @brief uart apis
*/
/** @addtogroup uart_apis
* @{
*/
/**
* @brief Wait until uart tx full empty and the last char send ok.
*
* @param uart_no : 0 for UART0, 1 for UART1, 2 for UART2
*
* The function defined in ROM code has a bug, so we define the correct version
* here for compatibility.
*/
static inline void IRAM_ATTR uart_tx_wait_idle(uint8_t uart_no) {
uint32_t tx_bytes;
uint32_t baudrate, byte_delay_us;
uart_dev_t *const UART[2] = {&uart0, &uart1};
uart_dev_t *const uart = UART[uart_no];
baudrate = (UART_CLK_FREQ / (uart->clk_div.val & 0xFFFFF));
byte_delay_us = (uint32_t)(10000000 / baudrate);
do {
tx_bytes = uart->status.txfifo_cnt;
/* either tx count or state is non-zero */
} while (tx_bytes);
ets_delay_us(byte_delay_us);
}
/**
* @brief Output a char to printf channel, wait until fifo not full.
*
* @param None
*
* @return OK.
*/
STATUS uart_tx_one_char(uint8_t TxChar);
/**
* @brief Get an input char from message channel.
* Please do not call this function in SDK.
*
* @param uint8_t *pRxChar : the pointer to store the char.
*
* @return OK for successful.
* FAIL for failed.
*/
STATUS uart_rx_one_char(uint8_t *pRxChar);
/**
* @brief Get an input string line from message channel.
* Please do not call this function in SDK.
*
* @param uint8_t *pString : the pointer to store the string.
*
* @param uint8_t MaxStrlen : the max string length, incude '\0'.
*
* @return OK.
*/
static inline STATUS UartRxString(uint8_t *pString, uint8_t MaxStrlen)
{
int rx_bytes = 0;
while(1) {
uint8_t data;
while (uart_rx_one_char(&data) != OK);
if (data == '\n' || data == '\r')
data = '\0';
pString[rx_bytes++] = data;
if (data == '\0')
return OK;
if (rx_bytes >= MaxStrlen)
return FAIL;
}
return OK;
}
/**
* @}
*/
#ifdef __cplusplus
}
#endif
#endif /* _ROM_UART_H_ */

View File

@ -0,0 +1,40 @@
// Copyright 2010-2016 Espressif Systems (Shanghai) PTE LTD
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef _SOC_CPU_H
#define _SOC_CPU_H
#include <stdint.h>
#include <stdbool.h>
#include <stddef.h>
#include "xtensa/corebits.h"
#include "xtensa/config/core.h"
/* C macros for xtensa special register read/write/exchange */
#define RSR(reg, curval) asm volatile ("rsr %0, " #reg : "=r" (curval));
#define WSR(reg, newval) asm volatile ("wsr %0, " #reg : : "r" (newval));
#define XSR(reg, swapval) asm volatile ("xsr %0, " #reg : "+r" (swapval));
/** @brief Read current stack pointer address
*
*/
static inline void *get_sp()
{
void *sp;
asm volatile ("mov %0, sp;" : "=r" (sp));
return sp;
}
#endif

View File

@ -58,3 +58,4 @@ PROVIDE ( gpio_pin_wakeup_disable = 0x40004ed4 );
PROVIDE ( gpio_pin_wakeup_enable = 0x40004e90 );
PROVIDE ( ets_io_vprintf = 0x40001f00 );
PROVIDE ( uart_rx_one_char = 0x40003b8c );

View File

@ -1,6 +0,0 @@
set(COMPONENT_SRCDIRS ".")
set(COMPONENT_ADD_INCLUDEDIRS ".")
set(COMPONENT_REQUIRES unity esp_http_server)
register_component()

View File

@ -1 +0,0 @@
COMPONENT_ADD_LDFLAGS = -Wl,--whole-archive -l$(COMPONENT_NAME) -Wl,--no-whole-archive

View File

@ -1,169 +0,0 @@
// Copyright 2018 Espressif Systems (Shanghai) PTE LTD
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <stdlib.h>
#include <stdbool.h>
#include <esp_system.h>
#include <esp_http_server.h>
#include "unity.h"
#include "test_utils.h"
int pre_start_mem, post_stop_mem, post_stop_min_mem;
bool basic_sanity = true;
esp_err_t null_func(httpd_req_t *req)
{
return ESP_OK;
}
httpd_uri_t handler_limit_uri (char* path)
{
httpd_uri_t uri = {
.uri = path,
.method = HTTP_GET,
.handler = null_func,
.user_ctx = NULL,
};
return uri;
};
static inline unsigned num_digits(unsigned x)
{
unsigned digits = 1;
while ((x = x/10) != 0) {
digits++;
}
return digits;
}
#define HTTPD_TEST_MAX_URI_HANDLERS 8
void test_handler_limit(httpd_handle_t hd)
{
int i;
char x[HTTPD_TEST_MAX_URI_HANDLERS+1][num_digits(HTTPD_TEST_MAX_URI_HANDLERS)+1];
httpd_uri_t uris[HTTPD_TEST_MAX_URI_HANDLERS+1];
for (i = 0; i < HTTPD_TEST_MAX_URI_HANDLERS + 1; i++) {
sprintf(x[i],"%d",i);
uris[i] = handler_limit_uri(x[i]);
}
/* Register multiple instances of the same handler for MAX URI Handlers */
for (i = 0; i < HTTPD_TEST_MAX_URI_HANDLERS; i++) {
TEST_ASSERT(httpd_register_uri_handler(hd, &uris[i]) == ESP_OK);
}
/* Register the MAX URI + 1 Handlers should fail */
TEST_ASSERT(httpd_register_uri_handler(hd, &uris[HTTPD_TEST_MAX_URI_HANDLERS]) != ESP_OK);
/* Unregister the one of the Handler should pass */
TEST_ASSERT(httpd_unregister_uri_handler(hd, uris[0].uri, uris[0].method) == ESP_OK);
/* Unregister non added Handler should fail */
TEST_ASSERT(httpd_unregister_uri_handler(hd, uris[0].uri, uris[0].method) != ESP_OK);
/* Register the MAX URI Handler should pass */
TEST_ASSERT(httpd_register_uri_handler(hd, &uris[0]) == ESP_OK);
/* Reregister same instance of handler should fail */
TEST_ASSERT(httpd_register_uri_handler(hd, &uris[0]) != ESP_OK);
/* Register the MAX URI + 1 Handlers should fail */
TEST_ASSERT(httpd_register_uri_handler(hd, &uris[HTTPD_TEST_MAX_URI_HANDLERS]) != ESP_OK);
/* Unregister the same handler for MAX URI Handlers */
for (i = 0; i < HTTPD_TEST_MAX_URI_HANDLERS; i++) {
TEST_ASSERT(httpd_unregister_uri_handler(hd, uris[i].uri, uris[i].method) == ESP_OK);
}
basic_sanity = false;
}
/********************* Test Handler Limit End *******************/
httpd_handle_t test_httpd_start(uint16_t id)
{
httpd_handle_t hd;
httpd_config_t config = HTTPD_DEFAULT_CONFIG();
config.max_uri_handlers = HTTPD_TEST_MAX_URI_HANDLERS;
config.server_port += id;
config.ctrl_port += id;
TEST_ASSERT(httpd_start(&hd, &config) == ESP_OK)
return hd;
}
#define SERVER_INSTANCES 2
/* Currently this only tests for the number of tasks.
* Heap leakage is not tested as LWIP allocates memory
* which may not be freed immedietly causing erroneous
* evaluation. Another test to implement would be the
* monitoring of open sockets, but LWIP presently provides
* no such API for getting the number of open sockets.
*/
TEST_CASE("Leak Test", "[HTTP SERVER]")
{
httpd_handle_t hd[SERVER_INSTANCES];
unsigned task_count;
bool res = true;
test_case_uses_tcpip();
task_count = uxTaskGetNumberOfTasks();
printf("Initial task count: %d\n", task_count);
pre_start_mem = esp_get_free_heap_size();
for (int i = 0; i < SERVER_INSTANCES; i++) {
hd[i] = test_httpd_start(i);
vTaskDelay(10);
unsigned num_tasks = uxTaskGetNumberOfTasks();
task_count++;
if (num_tasks != task_count) {
printf("Incorrect task count (starting): %d expected %d\n",
num_tasks, task_count);
res = false;
}
}
for (int i = 0; i < SERVER_INSTANCES; i++) {
if (httpd_stop(hd[i]) != ESP_OK) {
printf("Failed to stop httpd task %d\n", i);
res = false;
}
vTaskDelay(10);
unsigned num_tasks = uxTaskGetNumberOfTasks();
task_count--;
if (num_tasks != task_count) {
printf("Incorrect task count (stopping): %d expected %d\n",
num_tasks, task_count);
res = false;
}
}
post_stop_mem = esp_get_free_heap_size();
TEST_ASSERT(res == true);
}
TEST_CASE("Basic Functionality Tests", "[HTTP SERVER]")
{
httpd_handle_t hd;
httpd_config_t config = HTTPD_DEFAULT_CONFIG();
test_case_uses_tcpip();
TEST_ASSERT(httpd_start(&hd, &config) == ESP_OK);
test_handler_limit(hd);
TEST_ASSERT(httpd_stop(hd) == ESP_OK);
}

View File

@ -163,6 +163,14 @@ void TASK_SW_ATTR xPortSysTickHandle(void)
}
}
/**
* @brief Return current CPU clock frequency
*/
int esp_clk_cpu_freq(void)
{
return _xt_tick_divisor * XT_TICK_PER_SEC;
}
/*
* See header file for description.
*/

View File

@ -1,5 +0,0 @@
#
#Component Makefile
#
COMPONENT_ADD_LDFLAGS = -Wl,--whole-archive -l$(COMPONENT_NAME) -Wl,--no-whole-archive

View File

@ -1,97 +0,0 @@
#include <stdio.h>
#include <stdlib.h>
#include "unity.h"
#include "test_utils.h"
#include "esp_partition.h"
TEST_CASE("Can read partition table", "[partition]")
{
const esp_partition_t *p = esp_partition_find_first(ESP_PARTITION_TYPE_APP, ESP_PARTITION_SUBTYPE_ANY, NULL);
TEST_ASSERT_NOT_NULL(p);
TEST_ASSERT_EQUAL(0x10000, p->address);
TEST_ASSERT_EQUAL(ESP_PARTITION_SUBTYPE_APP_FACTORY, p->subtype);
esp_partition_iterator_t it = esp_partition_find(ESP_PARTITION_TYPE_DATA, ESP_PARTITION_SUBTYPE_ANY, NULL);
TEST_ASSERT_NOT_NULL(it);
int count = 0;
const esp_partition_t* prev = NULL;
for (; it != NULL; it = esp_partition_next(it)) {
const esp_partition_t *p = esp_partition_get(it);
TEST_ASSERT_NOT_NULL(p);
if (prev) {
TEST_ASSERT_TRUE_MESSAGE(prev->address < p->address, "incorrect partition order");
}
prev = p;
++count;
}
esp_partition_iterator_release(it);
TEST_ASSERT_EQUAL(4, count);
}
TEST_CASE("Can write, read, mmap partition", "[partition][ignore]")
{
const esp_partition_t *p = get_test_data_partition();
printf("Using partition %s at 0x%x, size 0x%x\n", p->label, p->address, p->size);
TEST_ASSERT_NOT_NULL(p);
const size_t max_size = 2 * SPI_FLASH_SEC_SIZE;
uint8_t *data = (uint8_t *) malloc(max_size);
TEST_ASSERT_NOT_NULL(data);
TEST_ASSERT_EQUAL(ESP_OK, esp_partition_erase_range(p, 0, p->size));
srand(0);
size_t block_size;
for (size_t offset = 0; offset < p->size; offset += block_size) {
block_size = ((rand() + 4) % max_size) & (~0x3);
size_t left = p->size - offset;
if (block_size > left) {
block_size = left;
}
for (size_t i = 0; i < block_size / 4; ++i) {
((uint32_t *) (data))[i] = rand();
}
TEST_ASSERT_EQUAL(ESP_OK, esp_partition_write(p, offset, data, block_size));
}
srand(0);
for (size_t offset = 0; offset < p->size; offset += block_size) {
block_size = ((rand() + 4) % max_size) & (~0x3);
size_t left = p->size - offset;
if (block_size > left) {
block_size = left;
}
TEST_ASSERT_EQUAL(ESP_OK, esp_partition_read(p, offset, data, block_size));
for (size_t i = 0; i < block_size / 4; ++i) {
TEST_ASSERT_EQUAL(rand(), ((uint32_t *) data)[i]);
}
}
free(data);
const uint32_t *mmap_data;
spi_flash_mmap_handle_t mmap_handle;
size_t begin = 3000;
size_t size = 64000; //chosen so size is smaller than 64K but the mmap straddles 2 MMU blocks
TEST_ASSERT_EQUAL(ESP_OK, esp_partition_mmap(p, begin, size, SPI_FLASH_MMAP_DATA,
(const void **)&mmap_data, &mmap_handle));
srand(0);
for (size_t offset = 0; offset < p->size; offset += block_size) {
block_size = ((rand() + 4) % max_size) & (~0x3);
size_t left = p->size - offset;
if (block_size > left) {
block_size = left;
}
for (size_t i = 0; i < block_size / 4; ++i) {
size_t pos = offset + i * 4;
uint32_t expected = rand();
if (pos < begin || pos >= (begin + size)) {
continue;
}
TEST_ASSERT_EQUAL(expected, mmap_data[(pos - begin) / 4]);
}
}
spi_flash_munmap(mmap_handle);
}

View File

@ -1,7 +0,0 @@
set(COMPONENT_SRCDIRS ".")
set(COMPONENT_ADD_INCLUDEDIRS ".")
set(COMPONENT_PRIV_INCLUDEDIRS "../proto-c/")
set(COMPONENT_REQUIRES unity mbedtls protocomm protobuf-c)
register_component()

View File

@ -1,2 +0,0 @@
COMPONENT_PRIV_INCLUDEDIRS := ../proto-c/
COMPONENT_ADD_LDFLAGS = -Wl,--whole-archive -l$(COMPONENT_NAME) -Wl,--no-whole-archive

File diff suppressed because it is too large Load Diff

View File

@ -1,170 +0,0 @@
// Copyright 2017 Espressif Systems (Shanghai) PTE LTD
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/* Unit tests need to have access to reliable timestamps even if CPU and APB
* clock frequencies change over time. This reference clock is built upon two
* peripherals: one RMT channel and one PCNT channel, plus one GPIO to connect
* these peripherals.
*
* RMT channel is configured to use REF_TICK as clock source, which is a 1 MHz
* clock derived from APB_CLK using a set of dividers. The divider is changed
* automatically by hardware depending on the current clock source of APB_CLK.
* For example, if APB_CLK is derived from PLL, one divider is used, and when
* APB_CLK is derived from XTAL, another divider is used. RMT channel clocked
* by REF_TICK is configured to generate a continuous 0.5 MHz signal, which is
* connected to a GPIO. PCNT takes the input signal from this GPIO and counts
* the edges (which occur at 1MHz frequency). PCNT counter is only 16 bit wide,
* so an interrupt is configured to trigger when the counter reaches 30000,
* incrementing a 32-bit millisecond counter maintained by software.
* Together these two counters may be used at any time to obtain the timestamp.
*/
#include "test_utils.h"
#include "soc/rmt_struct.h"
#include "soc/pcnt_struct.h"
#include "soc/pcnt_reg.h"
#include "soc/gpio_sig_map.h"
#include "soc/dport_reg.h"
#include "rom/gpio.h"
#include "rom/ets_sys.h"
#include "driver/gpio.h"
#include "esp_intr_alloc.h"
#include "freertos/FreeRTOS.h"
#include "driver/periph_ctrl.h"
/* Select which RMT and PCNT channels, and GPIO to use */
#define REF_CLOCK_RMT_CHANNEL 7
#define REF_CLOCK_PCNT_UNIT 0
#define REF_CLOCK_GPIO 21
#define REF_CLOCK_PRESCALER_MS 30
static void IRAM_ATTR pcnt_isr(void* arg);
static intr_handle_t s_intr_handle;
static portMUX_TYPE s_lock = portMUX_INITIALIZER_UNLOCKED;
static volatile uint32_t s_milliseconds;
void ref_clock_init()
{
assert(s_intr_handle == NULL && "already initialized");
// Route RMT output to GPIO matrix
gpio_matrix_out(REF_CLOCK_GPIO, RMT_SIG_OUT0_IDX + REF_CLOCK_RMT_CHANNEL, false, false);
// Initialize RMT
periph_module_enable(PERIPH_RMT_MODULE);
RMT.apb_conf.fifo_mask = 1;
rmt_item32_t data = {
.duration0 = 1,
.level0 = 1,
.duration1 = 0,
.level1 = 0
};
RMTMEM.chan[REF_CLOCK_RMT_CHANNEL].data32[0] = data;
RMTMEM.chan[REF_CLOCK_RMT_CHANNEL].data32[1].val = 0;
RMT.conf_ch[REF_CLOCK_RMT_CHANNEL].conf0.clk_en = 1;
RMT.conf_ch[REF_CLOCK_RMT_CHANNEL].conf1.tx_start = 0;
RMT.conf_ch[REF_CLOCK_RMT_CHANNEL].conf1.mem_owner = 0;
RMT.conf_ch[REF_CLOCK_RMT_CHANNEL].conf1.mem_rd_rst = 1;
RMT.conf_ch[REF_CLOCK_RMT_CHANNEL].conf1.apb_mem_rst = 1;
RMT.conf_ch[REF_CLOCK_RMT_CHANNEL].conf0.carrier_en = 0;
RMT.conf_ch[REF_CLOCK_RMT_CHANNEL].conf0.div_cnt = 1;
RMT.conf_ch[REF_CLOCK_RMT_CHANNEL].conf0.mem_size = 1;
RMT.conf_ch[REF_CLOCK_RMT_CHANNEL].conf1.ref_always_on = 0;
RMT.conf_ch[REF_CLOCK_RMT_CHANNEL].conf1.tx_conti_mode = 1;
RMT.conf_ch[REF_CLOCK_RMT_CHANNEL].conf1.tx_start = 1;
// Route signal to PCNT
int pcnt_sig_idx = (REF_CLOCK_PCNT_UNIT < 5) ?
PCNT_SIG_CH0_IN0_IDX + 4 * REF_CLOCK_PCNT_UNIT :
PCNT_SIG_CH0_IN5_IDX + 4 * (REF_CLOCK_PCNT_UNIT - 5);
gpio_matrix_in(REF_CLOCK_GPIO, pcnt_sig_idx, false);
if (REF_CLOCK_GPIO != 20) {
PIN_INPUT_ENABLE(GPIO_PIN_MUX_REG[REF_CLOCK_GPIO]);
} else {
PIN_INPUT_ENABLE(PERIPHS_IO_MUX_GPIO20_U);
}
// Initialize PCNT
periph_module_enable(PERIPH_PCNT_MODULE);
PCNT.conf_unit[REF_CLOCK_PCNT_UNIT].conf0.ch0_hctrl_mode = 0;
PCNT.conf_unit[REF_CLOCK_PCNT_UNIT].conf0.ch0_lctrl_mode = 0;
PCNT.conf_unit[REF_CLOCK_PCNT_UNIT].conf0.ch0_pos_mode = 1;
PCNT.conf_unit[REF_CLOCK_PCNT_UNIT].conf0.ch0_neg_mode = 1;
PCNT.conf_unit[REF_CLOCK_PCNT_UNIT].conf0.thr_l_lim_en = 0;
PCNT.conf_unit[REF_CLOCK_PCNT_UNIT].conf0.thr_h_lim_en = 1;
PCNT.conf_unit[REF_CLOCK_PCNT_UNIT].conf0.thr_zero_en = 0;
PCNT.conf_unit[REF_CLOCK_PCNT_UNIT].conf0.thr_thres0_en = 0;
PCNT.conf_unit[REF_CLOCK_PCNT_UNIT].conf0.thr_thres1_en = 0;
PCNT.conf_unit[REF_CLOCK_PCNT_UNIT].conf2.cnt_h_lim = REF_CLOCK_PRESCALER_MS * 1000;
// Enable PCNT and wait for it to start counting
PCNT.ctrl.val &= ~(BIT(REF_CLOCK_PCNT_UNIT * 2 + 1));
PCNT.ctrl.val |= BIT(REF_CLOCK_PCNT_UNIT * 2);
PCNT.ctrl.val &= ~BIT(REF_CLOCK_PCNT_UNIT * 2);
ets_delay_us(10000);
// Enable interrupt
s_milliseconds = 0;
ESP_ERROR_CHECK(esp_intr_alloc(ETS_PCNT_INTR_SOURCE, ESP_INTR_FLAG_IRAM, pcnt_isr, NULL, &s_intr_handle));
PCNT.int_clr.val = BIT(REF_CLOCK_PCNT_UNIT);
PCNT.int_ena.val = BIT(REF_CLOCK_PCNT_UNIT);
}
static void IRAM_ATTR pcnt_isr(void* arg)
{
portENTER_CRITICAL(&s_lock);
PCNT.int_clr.val = BIT(REF_CLOCK_PCNT_UNIT);
s_milliseconds += REF_CLOCK_PRESCALER_MS;
portEXIT_CRITICAL(&s_lock);
}
void ref_clock_deinit()
{
assert(s_intr_handle && "deinit called without init");
// Disable interrupt
PCNT.int_ena.val &= ~BIT(REF_CLOCK_PCNT_UNIT);
esp_intr_free(s_intr_handle);
s_intr_handle = NULL;
// Disable RMT
RMT.conf_ch[REF_CLOCK_RMT_CHANNEL].conf1.tx_start = 0;
RMT.conf_ch[REF_CLOCK_RMT_CHANNEL].conf0.clk_en = 0;
periph_module_disable(PERIPH_RMT_MODULE);
// Disable PCNT
PCNT.ctrl.val |= ~(BIT(REF_CLOCK_PCNT_UNIT * 2 + 1));
periph_module_disable(PERIPH_PCNT_MODULE);
}
uint64_t ref_clock_get()
{
portENTER_CRITICAL(&s_lock);
uint32_t microseconds = PCNT.cnt_unit[REF_CLOCK_PCNT_UNIT].cnt_val;
uint32_t milliseconds = s_milliseconds;
if (PCNT.int_st.val & BIT(REF_CLOCK_PCNT_UNIT)) {
// refresh counter value, in case the overflow has happened after reading cnt_val
microseconds = PCNT.cnt_unit[REF_CLOCK_PCNT_UNIT].cnt_val;
milliseconds += REF_CLOCK_PRESCALER_MS;
}
portEXIT_CRITICAL(&s_lock);
return 1000 * (uint64_t) milliseconds + (uint64_t) microseconds;
}

View File

@ -96,7 +96,7 @@ void tearDown(void)
Unity.TestFile = __FILE__;
/* check if unit test has caused heap corruption in any heap */
TEST_ASSERT_MESSAGE( heap_caps_check_integrity(MALLOC_CAP_INVALID, true), "The test has corrupted the heap");
//TEST_ASSERT_MESSAGE( heap_caps_check_integrity(MALLOC_CAP_INVALID, true), "The test has corrupted the heap");
/* check for leaks */
#ifdef CONFIG_HEAP_TRACING

View File

@ -1,3 +0,0 @@
TEST_EXCLUDE_COMPONENTS=libsodium bt app_update
TEST_COMPONENTS=mbedtls
CONFIG_MBEDTLS_HARDWARE_AES=n

View File

@ -1,13 +0,0 @@
TEST_COMPONENTS=app_update
TEST_EXCLUDE_COMPONENTS=libsodium bt
CONFIG_UNITY_FREERTOS_STACK_SIZE=12288
CONFIG_PARTITION_TABLE_CUSTOM=y
CONFIG_PARTITION_TABLE_CUSTOM_FILENAME="partition_table_unit_test_two_ota.csv"
CONFIG_PARTITION_TABLE_FILENAME="partition_table_unit_test_two_ota.csv"
CONFIG_PARTITION_TABLE_OFFSET=0x18000
CONFIG_BOOTLOADER_FACTORY_RESET=y
CONFIG_BOOTLOADER_APP_TEST=y
CONFIG_BOOTLOADER_HOLD_TIME_GPIO=2
CONFIG_BOOTLOADER_OTA_DATA_ERASE=y
CONFIG_BOOTLOADER_NUM_PIN_FACTORY_RESET=4
CONFIG_BOOTLOADER_NUM_PIN_APP_TEST=32

View File

@ -1,4 +0,0 @@
TEST_COMPONENTS=bt
TEST_EXCLUDE_COMPONENTS=app_update
CONFIG_BT_ENABLED=y
CONFIG_UNITY_FREERTOS_STACK_SIZE=12288

View File

@ -1 +0,0 @@
TEST_EXCLUDE_COMPONENTS=libsodium bt app_update

View File

@ -1,3 +0,0 @@
TEST_COMPONENTS=libsodium
TEST_EXCLUDE_COMPONENTS=bt app_update
CONFIG_UNITY_FREERTOS_STACK_SIZE=12288

View File

@ -1,3 +0,0 @@
TEST_EXCLUDE_COMPONENTS=libsodium bt app_update driver esp32 spi_flash
CONFIG_SPIRAM_SUPPORT=y
CONFIG_SPIRAM_BANKSWITCH_ENABLE=n

View File

@ -1,3 +0,0 @@
TEST_COMPONENTS=driver esp32 spi_flash
CONFIG_SPIRAM_SUPPORT=y
CONFIG_SPIRAM_BANKSWITCH_ENABLE=n

View File

@ -1,4 +0,0 @@
TEST_COMPONENTS=esp32
CONFIG_SPIRAM_SUPPORT=y
CONFIG_SPIRAM_BANKSWITCH_ENABLE=y
CONFIG_SPIRAM_BANKSWITCH_RESERVE=8

View File

@ -1,3 +0,0 @@
TEST_EXCLUDE_COMPONENTS=bt app_update
CONFIG_OPTIMIZATION_LEVEL_RELEASE=y
CONFIG_OPTIMIZATION_ASSERTIONS_SILENT=y

View File

@ -1,4 +0,0 @@
TEST_EXCLUDE_COMPONENTS=libsodium bt app_update
CONFIG_MEMMAP_SMP=n
CONFIG_FREERTOS_UNICORE=y
CONFIG_ESP32_RTCDATA_IN_FAST_MEM=y

View File

@ -1,279 +0,0 @@
import sys
import glob
import tempfile
import os
import os.path
import re
import shutil
import argparse
import json
import copy
PROJECT_NAME = "unit-test-app"
PROJECT_PATH = os.getcwd()
# List of unit-test-app configurations.
# Each file in configs/ directory defines a configuration. The format is the
# same as sdkconfig file. Configuration is applied on top of sdkconfig.defaults
# file from the project directory
CONFIG_NAMES = os.listdir(os.path.join(PROJECT_PATH, "configs"))
# Build (intermediate) and output (artifact) directories
BUILDS_DIR = os.path.join(PROJECT_PATH, "builds")
BINARIES_DIR = os.path.join(PROJECT_PATH, "output")
# Convert the values passed to the -T parameter to corresponding cache entry definitions
# TESTS_ALL and TEST_COMPONENTS
class TestComponentAction(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
# Create a new of cache definition entry, adding previous elements
cache_entries = list()
existing_entries = getattr(namespace, "define_cache_entry", [])
if existing_entries:
cache_entries.extend(existing_entries)
# Form -D arguments
if "all" in values:
cache_entries.append("TESTS_ALL=1")
cache_entries.append("TEST_COMPONENTS=''")
else:
cache_entries.append("TESTS_ALL=0")
cache_entries.append("TEST_COMPONENTS='%s'" % " ".join(values))
setattr(namespace, "define_cache_entry", cache_entries)
# Brute force add reconfigure at the very beginning
existing_actions = getattr(namespace, "actions", [])
if not "reconfigure" in existing_actions:
existing_actions = ["reconfigure"] + existing_actions
setattr(namespace, "actions", existing_actions)
class TestExcludeComponentAction(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
# Create a new of cache definition entry, adding previous elements
cache_entries = list()
existing_entries = getattr(namespace, "define_cache_entry", [])
if existing_entries:
cache_entries.extend(existing_entries)
cache_entries.append("TEST_EXCLUDE_COMPONENTS='%s'" % " ".join(values))
setattr(namespace, "define_cache_entry", cache_entries)
# Brute force add reconfigure at the very beginning
existing_actions = getattr(namespace, "actions", [])
if not "reconfigure" in existing_actions:
existing_actions = ["reconfigure"] + existing_actions
setattr(namespace, "actions", existing_actions)
def add_argument_extensions(parser):
# For convenience, define a -T argument that gets converted to -D arguments
parser.add_argument('-T', '--test-component', help="Specify the components to test", nargs='+', action=TestComponentAction)
# For convenience, define a -T argument that gets converted to -D arguments
parser.add_argument('-E', '--test-exclude-components', help="Specify the components to exclude from testing", nargs='+', action=TestExcludeComponentAction)
def add_action_extensions(base_functions, base_actions):
def ut_apply_config(ut_apply_config_name, args):
config_name = re.match(r"ut-apply-config-(.*)", ut_apply_config_name).group(1)
def set_config_build_variables(prop, defval = None):
property_value = re.match(r"^%s=(.*)" % prop, config_file_content)
if (property_value):
property_value = property_value.group(1)
else:
property_value = defval
if (property_value):
try:
args.define_cache_entry.append("%s=" % prop + property_value)
except AttributeError:
args.define_cache_entry = ["%s=" % prop + property_value]
return property_value
sdkconfig_set = None
if args.define_cache_entry:
sdkconfig_set = filter(lambda s: "SDKCONFIG=" in s, args.define_cache_entry)
sdkconfig_path = os.path.join(args.project_dir, "sdkconfig")
if sdkconfig_set:
sdkconfig_path = sdkconfig_set[-1].split("=")[1]
sdkconfig_path = os.path.abspath(sdkconfig_path)
try:
os.remove(sdkconfig_path)
except OSError:
pass
if config_name in CONFIG_NAMES:
# Parse the sdkconfig for components to be included/excluded and tests to be run
config = os.path.join(PROJECT_PATH, "configs", config_name)
with open(config, "r") as config_file:
config_file_content = config_file.read()
set_config_build_variables("EXCLUDE_COMPONENTS", "''")
test_components = set_config_build_variables("TEST_COMPONENTS", "''")
tests_all = None
if test_components == "''":
tests_all = "TESTS_ALL=1"
else:
tests_all = "TESTS_ALL=0"
try:
args.define_cache_entry.append(tests_all)
except AttributeError:
args.define_cache_entry = [tests_all]
set_config_build_variables("TEST_EXCLUDE_COMPONENTS","''")
with tempfile.NamedTemporaryFile() as sdkconfig_temp:
# Use values from the combined defaults and the values from
# config folder to build config
sdkconfig_default = os.path.join(PROJECT_PATH, "sdkconfig.defaults")
with open(sdkconfig_default, "rb") as sdkconfig_default_file:
sdkconfig_temp.write(sdkconfig_default_file.read())
sdkconfig_config = os.path.join(PROJECT_PATH, "configs", config_name)
with open(sdkconfig_config, "rb") as sdkconfig_config_file:
sdkconfig_temp.write(b"\n")
sdkconfig_temp.write(sdkconfig_config_file.read())
sdkconfig_temp.flush()
try:
args.define_cache_entry.append("SDKCONFIG_DEFAULTS=" + sdkconfig_temp.name)
except AttributeError:
args.define_cache_entry = ["SDKCONFIG_DEFAULTS=" + sdkconfig_temp.name]
reconfigure = base_functions["reconfigure"]
reconfigure(None, args)
else:
if not config_name == "all-configs":
print("unknown unit test app config for action '%s'" % ut_apply_config_name)
# This target builds the configuration. It does not currently track dependencies,
# but is good enough for CI builds if used together with clean-all-configs.
# For local builds, use 'apply-config-NAME' target and then use normal 'all'
# and 'flash' targets.
def ut_build(ut_build_name, args):
# Create a copy of the passed arguments to prevent arg modifications to accrue if
# all configs are being built
build_args = copy.copy(args)
config_name = re.match(r"ut-build-(.*)", ut_build_name).group(1)
if config_name in CONFIG_NAMES:
build_args.build_dir = os.path.join(BUILDS_DIR, config_name)
src = os.path.join(BUILDS_DIR, config_name)
dest = os.path.join(BINARIES_DIR, config_name)
try:
os.makedirs(dest)
except OSError:
pass
# Build, tweaking paths to sdkconfig and sdkconfig.defaults
ut_apply_config("ut-apply-config-" + config_name, build_args)
build_target = base_functions["build_target"]
build_target("all", build_args)
# Copy artifacts to the output directory
shutil.copyfile(os.path.join(build_args.project_dir, "sdkconfig"), os.path.join(dest, "sdkconfig"))
binaries = [PROJECT_NAME + x for x in [".elf", ".bin", ".map"]]
for binary in binaries:
shutil.copyfile(os.path.join(src, binary), os.path.join(dest, binary))
try:
os.mkdir(os.path.join(dest, "bootloader"))
except OSError:
pass
shutil.copyfile(os.path.join(src, "bootloader", "bootloader.bin"), os.path.join(dest, "bootloader", "bootloader.bin"))
for partition_table in glob.glob(os.path.join(src, "partition_table", "partition-table*.bin")):
try:
os.mkdir(os.path.join(dest, "partition_table"))
except OSError:
pass
shutil.copyfile(partition_table, os.path.join(dest, "partition_table", os.path.basename(partition_table)))
shutil.copyfile(os.path.join(src, "flash_project_args"), os.path.join(dest, "flash_project_args"))
binaries = glob.glob(os.path.join(src, "*.bin"))
binaries = [os.path.basename(s) for s in binaries]
for binary in binaries:
shutil.copyfile(os.path.join(src, binary), os.path.join(dest, binary))
else:
if not config_name == "all-configs":
print("unknown unit test app config for action '%s'" % ut_build_name)
def ut_clean(ut_clean_name, args):
config_name = re.match(r"ut-clean-(.*)", ut_clean_name).group(1)
if config_name in CONFIG_NAMES:
shutil.rmtree(os.path.join(BUILDS_DIR, config_name), ignore_errors=True)
shutil.rmtree(os.path.join(BINARIES_DIR, config_name), ignore_errors=True)
else:
if not config_name == "all-configs":
print("unknown unit test app config for action '%s'" % ut_clean_name)
def ut_help(action, args):
HELP_STRING = """
Additional unit-test-app specific targets
idf.py ut-build-NAME - Build unit-test-app with configuration provided in configs/NAME.
Build directory will be builds/NAME/, output binaries will be
under output/NAME/
idf.py ut-clean-NAME - Remove build and output directories for configuration NAME.
idf.py ut-build-all-configs - Build all configurations defined in configs/ directory.
idf.py ut-apply-config-NAME - Generates configuration based on configs/NAME in sdkconfig
file. After this, normal all/flash targets can be used.
Useful for development/debugging.
"""
print(HELP_STRING)
# Build dictionary of action extensions
extensions = dict()
# This generates per-config targets (clean, build, apply-config).
build_all_config_deps = []
clean_all_config_deps = []
for config in CONFIG_NAMES:
config_build_action_name = "ut-build-" + config
config_clean_action_name = "ut-clean-" + config
config_apply_config_action_name = "ut-apply-config-" + config
extensions[config_build_action_name] = (ut_build, [], [])
extensions[config_clean_action_name] = (ut_clean, [], [])
extensions[config_apply_config_action_name] = (ut_apply_config, [], [])
build_all_config_deps.append(config_build_action_name)
clean_all_config_deps.append(config_clean_action_name)
extensions["ut-build-all-configs"] = (ut_build, build_all_config_deps, [])
extensions["ut-clean-all-configs"] = (ut_clean, clean_all_config_deps, [])
extensions["ut-help"] = (ut_help, [], [])
base_actions.update(extensions)

View File

@ -5,13 +5,10 @@
nvs, data, nvs, 0x9000, 0x4000
otadata, data, ota, 0xd000, 0x2000
phy_init, data, phy, 0xf000, 0x1000
factory, 0, 0, 0x10000, 0x240000
# these OTA partitions are used for tests, but can't fit real OTA apps in them
# (done this way to reduce total flash usage.)
factory, 0, 0, 0x10000, 0xF0000
ota_0, 0, ota_0, , 64K
ota_1, 0, ota_1, , 64K
# flash_test partition used for SPI flash tests, WL FAT tests, and SPIFFS tests
flash_test, data, fat, , 528K
nvs_key, data, nvs_keys, , 0x1000, encrypted
# Note: still 1MB of a 4MB flash left free for some other purpose

1 # Special partition table for unit test app
5 nvs, data, nvs, 0x9000, 0x4000
6 otadata, data, ota, 0xd000, 0x2000
7 phy_init, data, phy, 0xf000, 0x1000
8 factory, 0, 0, 0x10000, 0x240000 factory, 0, 0, 0x10000, 0xF0000
# these OTA partitions are used for tests, but can't fit real OTA apps in them
# (done this way to reduce total flash usage.)
9 ota_0, 0, ota_0, , 64K
10 ota_1, 0, ota_1, , 64K
11 # flash_test partition used for SPI flash tests, WL FAT tests, and SPIFFS tests
12 flash_test, data, fat, , 528K
nvs_key, data, nvs_keys, , 0x1000, encrypted
13 # Note: still 1MB of a 4MB flash left free for some other purpose
14

View File

@ -1,11 +0,0 @@
# Special partition table for unit test app_update
# Name, Type, SubType, Offset, Size, Flags
nvs, data, nvs, , 0x4000
otadata, data, ota, , 0x2000
phy_init, data, phy, , 0x1000
factory, 0, 0, , 0xB0000
ota_0, 0, ota_0, , 0xB0000
ota_1, 0, ota_1, , 0xB0000
test, 0, test, , 0xB0000
# flash_test partition used for SPI flash tests, WL FAT tests, and SPIFFS tests
flash_test, data, fat, , 528K
1 # Special partition table for unit test app_update
2 # Name, Type, SubType, Offset, Size, Flags
3 nvs, data, nvs, , 0x4000
4 otadata, data, ota, , 0x2000
5 phy_init, data, phy, , 0x1000
6 factory, 0, 0, , 0xB0000
7 ota_0, 0, ota_0, , 0xB0000
8 ota_1, 0, ota_1, , 0xB0000
9 test, 0, test, , 0xB0000
10 # flash_test partition used for SPI flash tests, WL FAT tests, and SPIFFS tests
11 flash_test, data, fat, , 528K

View File

@ -1,30 +1,7 @@
CONFIG_LOG_BOOTLOADER_LEVEL_WARN=y
CONFIG_ESPTOOLPY_BAUD_921600B=y
CONFIG_ESPTOOLPY_FLASHSIZE_4MB=y
CONFIG_ESPTOOLPY_FLASHSIZE_DETECT=n
CONFIG_PARTITION_TABLE_CUSTOM=y
CONFIG_PARTITION_TABLE_CUSTOM_FILENAME="partition_table_unit_test_app.csv"
CONFIG_PARTITION_TABLE_FILENAME="partition_table_unit_test_app.csv"
CONFIG_PARTITION_TABLE_OFFSET=0x8000
CONFIG_ESP32_DEFAULT_CPU_FREQ_240=y
CONFIG_ESP32_XTAL_FREQ_AUTO=y
CONFIG_FREERTOS_HZ=1000
CONFIG_FREERTOS_WATCHPOINT_END_OF_STACK=y
CONFIG_FREERTOS_THREAD_LOCAL_STORAGE_POINTERS=3
CONFIG_FREERTOS_USE_TRACE_FACILITY=y
CONFIG_HEAP_POISONING_COMPREHENSIVE=y
CONFIG_MBEDTLS_HARDWARE_MPI=y
CONFIG_MBEDTLS_MPI_USE_INTERRUPT=y
CONFIG_MBEDTLS_HARDWARE_SHA=y
CONFIG_SPI_FLASH_ENABLE_COUNTERS=y
CONFIG_ULP_COPROC_ENABLED=y
CONFIG_TASK_WDT=n
CONFIG_SPI_FLASH_WRITING_DANGEROUS_REGIONS_FAILS=y
CONFIG_FREERTOS_QUEUE_REGISTRY_SIZE=7
CONFIG_STACK_CHECK_STRONG=y
CONFIG_STACK_CHECK=y
CONFIG_SUPPORT_STATIC_ALLOCATION=y
CONFIG_ESP_TIMER_PROFILING=y
CONFIG_ADC2_DISABLE_DAC=n
CONFIG_WARN_WRITE_STRINGS=y
CONFIG_SPI_MASTER_IN_IRAM=y
CONFIG_TASK_WDT=
CONFIG_ENABLE_PTHREAD=y

View File

@ -1,2 +0,0 @@
"psram": '{CONFIG_SPIRAM_SUPPORT=y} and not {CONFIG_SPIRAM_BANKSWITCH_ENABLE=y}'
"8Mpsram": "CONFIG_SPIRAM_BANKSWITCH_ENABLE=y"

View File

@ -1,163 +0,0 @@
# This file is used to process section data generated by `objdump -s`
import re
class Section(object):
"""
One Section of section table. contains info about section name, address and raw data
"""
SECTION_START_PATTERN = re.compile(b"Contents of section (.+?):")
DATA_PATTERN = re.compile(b"([0-9a-f]{4,8})")
def __init__(self, name, start_address, data):
self.name = name
self.start_address = start_address
self.data = data
def __contains__(self, item):
""" check if the section name and address match this section """
if (item["section"] == self.name or item["section"] == "any") \
and (self.start_address <= item["address"] < (self.start_address + len(self.data))):
return True
else:
return False
def __getitem__(self, item):
"""
process slice.
convert absolute address to relative address in current section and return slice result
"""
if isinstance(item, int):
return self.data[item - self.start_address]
elif isinstance(item, slice):
start = item.start if item.start is None else item.start - self.start_address
stop = item.stop if item.stop is None else item.stop - self.start_address
return self.data[start:stop]
return self.data[item]
def __str__(self):
return "%s [%08x - %08x]" % (self.name, self.start_address, self.start_address + len(self.data))
__repr__ = __str__
@classmethod
def parse_raw_data(cls, raw_data):
"""
process raw data generated by `objdump -s`, create section and return un-processed lines
:param raw_data: lines of raw data generated by `objdump -s`
:return: one section, un-processed lines
"""
name = ""
data = ""
start_address = 0
# first find start line
for i, line in enumerate(raw_data):
if b"Contents of section " in line: # do strcmp first to speed up
match = cls.SECTION_START_PATTERN.search(line)
if match is not None:
name = match.group(1)
raw_data = raw_data[i + 1:]
break
else:
# do some error handling
raw_data = [b""] # add a dummy first data line
def process_data_line(line_to_process):
# first remove the ascii part
hex_part = line_to_process.split(b" ")[0]
# process rest part
data_list = cls.DATA_PATTERN.findall(hex_part)
try:
_address = int(data_list[0], base=16)
except IndexError:
_address = -1
def hex_to_str(hex_data):
if len(hex_data) % 2 == 1:
hex_data = b"0" + hex_data # append zero at the beginning
_length = len(hex_data)
return "".join([chr(int(hex_data[_i:_i + 2], base=16))
for _i in range(0, _length, 2)])
return _address, "".join([hex_to_str(x) for x in data_list[1:]])
# handle first line:
address, _data = process_data_line(raw_data[0])
if address != -1:
start_address = address
data += _data
raw_data = raw_data[1:]
for i, line in enumerate(raw_data):
address, _data = process_data_line(line)
if address == -1:
raw_data = raw_data[i:]
break
else:
data += _data
else:
# do error handling
raw_data = []
section = cls(name, start_address, data) if start_address != -1 else None
unprocessed_data = None if len(raw_data) == 0 else raw_data
return section, unprocessed_data
class SectionTable(object):
""" elf section table """
def __init__(self, file_name):
with open(file_name, "rb") as f:
raw_data = f.readlines()
self.table = []
while raw_data:
section, raw_data = Section.parse_raw_data(raw_data)
self.table.append(section)
def get_unsigned_int(self, section, address, size=4, endian="LE"):
"""
get unsigned int from section table
:param section: section name; use "any" will only match with address
:param address: start address
:param size: size in bytes
:param endian: LE or BE
:return: int or None
"""
if address % 4 != 0 or size % 4 != 0:
print("warning: try to access without 4 bytes aligned")
key = {"address": address, "section": section}
for section in self.table:
if key in section:
tmp = section[address:address+size]
value = 0
for i in range(size):
if endian == "LE":
value += ord(tmp[i]) << (i*8)
elif endian == "BE":
value += ord(tmp[i]) << ((size - i - 1) * 8)
else:
print("only support LE or BE for parameter endian")
assert False
break
else:
value = None
return value
def get_string(self, section, address):
"""
get string ('\0' terminated) from section table
:param section: section name; use "any" will only match with address
:param address: start address
:return: string or None
"""
value = None
key = {"address": address, "section": section}
for section in self.table:
if key in section:
value = section[address:]
for i, c in enumerate(value):
if c == '\0':
value = value[:i]
break
break
return value

View File

@ -1,127 +0,0 @@
freertos:
module: System
module abbr: SYS
sub module: OS
sub module abbr: OS
nvs:
module: System
module abbr: SYS
sub module: NVS
sub module abbr: NVS
partition:
module: System
module abbr: SYS
sub module: Misc
sub module abbr: MISC
ulp:
module: System
module abbr: SYS
sub module: Misc
sub module abbr: MISC
fp:
module: System
module abbr: SYS
sub module: Misc
sub module abbr: MISC
hw:
module: System
module abbr: SYS
sub module: Misc
sub module abbr: MISC
tjpgd:
module: System
module abbr: SYS
sub module: Misc
sub module abbr: MISC
miniz:
module: System
module abbr: SYS
sub module: Misc
sub module abbr: MISC
mmap:
module: System
module abbr: SYS
sub module: Misc
sub module abbr: MISC
bignum:
module: System
module abbr: SYS
sub module: Misc
sub module abbr: MISC
newlib:
module: System
module abbr: SYS
sub module: Std Lib
sub module abbr: STD
aes:
module: System
module abbr: SYS
sub module: Misc
sub module abbr: MISC
mbedtls:
module: System
module abbr: SYS
sub module: Misc
sub module abbr: MISC
spi_flash:
module: Driver
module abbr: DRV
sub module: SPI
sub module abbr: SPI
spi_flash_read:
module: Driver
module abbr: DRV
sub module: SPI
sub module abbr: SPI
spi_flash_write:
module: Driver
module abbr: DRV
sub module: SPI
sub module abbr: SPI
esp32:
module: System
module abbr: SYS
sub module: Misc
sub module abbr: MISC
deepsleep:
module: RTC
module abbr: RTC
sub module: Deep Sleep
sub module abbr: SLEEP
sd:
module: System
module abbr: SYS
sub module: Misc
sub module abbr: MISC
cxx:
module: System
module abbr: SYS
sub module: Misc
sub module abbr: MISC
fatfs:
module: System
module abbr: SYS
sub module: Misc
sub module abbr: MISC
delay:
module: System
module abbr: SYS
sub module: Misc
sub module abbr: MISC
spi:
module: System
module abbr: SYS
sub module: Misc
sub module abbr: MISC
vfs:
module: System
module abbr: SYS
sub module: Misc
sub module abbr: MISC
misc:
module: System
module abbr: SYS
sub module: Misc
sub module abbr: MISC

View File

@ -1,20 +0,0 @@
ignore:
# if the type exist but no value assigned
default: "Yes"
# if the type is not exist in tag list
omitted: "No"
test_env:
default: "UT_T1_1"
omitted: "UT_T1_1"
reset:
default: "POWERON_RESET"
omitted: " "
multi_device:
default: "Yes"
omitted: "No"
multi_stage:
default: "Yes"
omitted: "No"
timeout:
default: 30
omitted: 30

View File

@ -1,318 +0,0 @@
from __future__ import print_function
import yaml
import os
import re
import shutil
import subprocess
from copy import deepcopy
import CreateSectionTable
TEST_CASE_PATTERN = {
"initial condition": "UTINIT1",
"SDK": "ESP32_IDF",
"level": "Unit",
"execution time": 0,
"auto test": "Yes",
"category": "Function",
"test point 1": "basic function",
"version": "v1 (2016-12-06)",
"test environment": "UT_T1_1",
"reset": "",
"expected result": "1. set succeed",
"cmd set": "test_unit_test_case",
"Test App": "UT",
}
class Parser(object):
""" parse unit test cases from build files and create files for test bench """
TAG_PATTERN = re.compile("([^=]+)(=)?(.+)?")
DESCRIPTION_PATTERN = re.compile("\[([^]\[]+)\]")
CONFIG_PATTERN = re.compile(r"{([^}]+)}")
# file path (relative to idf path)
TAG_DEF_FILE = os.path.join("tools", "unit-test-app", "tools", "TagDefinition.yml")
MODULE_DEF_FILE = os.path.join("tools", "unit-test-app", "tools", "ModuleDefinition.yml")
CONFIG_DEPENDENCY_FILE = os.path.join("tools", "unit-test-app", "tools", "ConfigDependency.yml")
MODULE_ARTIFACT_FILE = os.path.join("components", "idf_test", "ModuleDefinition.yml")
TEST_CASE_FILE = os.path.join("components", "idf_test", "unit_test", "TestCaseAll.yml")
UT_BIN_FOLDER = os.path.join("tools", "unit-test-app", "output")
ELF_FILE = "unit-test-app.elf"
SDKCONFIG_FILE = "sdkconfig"
def __init__(self, idf_path=os.getenv("IDF_PATH")):
self.test_env_tags = {}
self.unit_jobs = {}
self.file_name_cache = {}
self.idf_path = idf_path
self.tag_def = yaml.load(open(os.path.join(idf_path, self.TAG_DEF_FILE), "r"))
self.module_map = yaml.load(open(os.path.join(idf_path, self.MODULE_DEF_FILE), "r"))
self.config_dependencies = yaml.load(open(os.path.join(idf_path, self.CONFIG_DEPENDENCY_FILE), "r"))
# used to check if duplicated test case names
self.test_case_names = set()
self.parsing_errors = []
def parse_test_cases_for_one_config(self, config_output_folder, config_name):
"""
parse test cases from elf and save test cases need to be executed to unit test folder
:param config_output_folder: build folder of this config
:param config_name: built unit test config name
"""
elf_file = os.path.join(config_output_folder, self.ELF_FILE)
subprocess.check_output('xtensa-esp32-elf-objdump -t {} | grep test_desc > case_address.tmp'.format(elf_file),
shell=True)
subprocess.check_output('xtensa-esp32-elf-objdump -s {} > section_table.tmp'.format(elf_file), shell=True)
table = CreateSectionTable.SectionTable("section_table.tmp")
tags = self.parse_tags(os.path.join(config_output_folder, self.SDKCONFIG_FILE))
test_cases = []
with open("case_address.tmp", "rb") as f:
for line in f:
# process symbol table like: "3ffb4310 l O .dram0.data 00000018 test_desc_33$5010"
line = line.split()
test_addr = int(line[0], 16)
section = line[3]
name_addr = table.get_unsigned_int(section, test_addr, 4)
desc_addr = table.get_unsigned_int(section, test_addr + 4, 4)
file_name_addr = table.get_unsigned_int(section, test_addr + 12, 4)
function_count = table.get_unsigned_int(section, test_addr+20, 4)
name = table.get_string("any", name_addr)
desc = table.get_string("any", desc_addr)
file_name = table.get_string("any", file_name_addr)
tc = self.parse_one_test_case(name, desc, file_name, config_name, tags)
# check if duplicated case names
# we need to use it to select case,
# if duplicated IDs, Unity could select incorrect case to run
# and we need to check all cases no matter if it's going te be executed by CI
# also add app_name here, we allow same case for different apps
if (tc["summary"] + config_name) in self.test_case_names:
self.parsing_errors.append("duplicated test case ID: " + tc["summary"])
else:
self.test_case_names.add(tc["summary"] + config_name)
if tc["CI ready"] == "Yes":
# update test env list and the cases of same env list
if tc["test environment"] in self.test_env_tags:
self.test_env_tags[tc["test environment"]].append(tc["ID"])
else:
self.test_env_tags.update({tc["test environment"]: [tc["ID"]]})
if function_count > 1:
tc.update({"child case num": function_count})
# only add cases need to be executed
test_cases.append(tc)
os.remove("section_table.tmp")
os.remove("case_address.tmp")
return test_cases
def parse_case_properities(self, tags_raw):
"""
parse test case tags (properities) with the following rules:
* first tag is always group of test cases, it's mandatory
* the rest tags should be [type=value].
* if the type have default value, then [type] equal to [type=default_value].
* if the type don't don't exist, then equal to [type=omitted_value]
default_value and omitted_value are defined in TagDefinition.yml
:param tags_raw: raw tag string
:return: tag dict
"""
tags = self.DESCRIPTION_PATTERN.findall(tags_raw)
assert len(tags) > 0
p = dict([(k, self.tag_def[k]["omitted"]) for k in self.tag_def])
p["module"] = tags[0]
if p["module"] not in self.module_map:
p["module"] = "misc"
# parsing rest tags, [type=value], =value is optional
for tag in tags[1:]:
match = self.TAG_PATTERN.search(tag)
assert match is not None
tag_type = match.group(1)
tag_value = match.group(3)
if match.group(2) == "=" and tag_value is None:
# [tag_type=] means tag_value is empty string
tag_value = ""
if tag_type in p:
if tag_value is None:
p[tag_type] = self.tag_def[tag_type]["default"]
else:
p[tag_type] = tag_value
else:
# ignore not defined tag type
pass
return p
@staticmethod
def parse_tags_internal(sdkconfig, config_dependencies, config_pattern):
required_tags = []
def compare_config(config):
return config in sdkconfig
def process_condition(condition):
matches = config_pattern.findall(condition)
if matches:
for config in matches:
compare_result = compare_config(config)
# replace all configs in condition with True or False according to compare result
condition = re.sub(config_pattern, str(compare_result), condition, count=1)
# Now the condition is a python condition, we can use eval to compute its value
ret = eval(condition)
else:
# didn't use complex condition. only defined one condition for the tag
ret = compare_config(condition)
return ret
for tag in config_dependencies:
if process_condition(config_dependencies[tag]):
required_tags.append(tag)
return required_tags
def parse_tags(self, sdkconfig_file):
"""
Some test configs could requires different DUTs.
For example, if CONFIG_SPIRAM_SUPPORT is enabled, we need WROVER-Kit to run test.
This method will get tags for runners according to ConfigDependency.yml(maps tags to sdkconfig).
We support to the following syntax::
# define the config which requires the tag
'tag_a': 'config_a="value_a"'
# define the condition for the tag
'tag_b': '{config A} and (not {config B} or (not {config C} and {config D}))'
:param sdkconfig_file: sdk config file of the unit test config
:return: required tags for runners
"""
with open(sdkconfig_file, "r") as f:
configs_raw_data = f.read()
configs = configs_raw_data.splitlines(False)
return self.parse_tags_internal(configs, self.config_dependencies, self.CONFIG_PATTERN)
def parse_one_test_case(self, name, description, file_name, config_name, tags):
"""
parse one test case
:param name: test case name (summary)
:param description: test case description (tag string)
:param file_name: the file defines this test case
:param config_name: built unit test app name
:param tags: tags to select runners
:return: parsed test case
"""
prop = self.parse_case_properities(description)
test_case = deepcopy(TEST_CASE_PATTERN)
test_case.update({"config": config_name,
"module": self.module_map[prop["module"]]['module'],
"CI ready": "No" if prop["ignore"] == "Yes" else "Yes",
"ID": name,
"test point 2": prop["module"],
"steps": name,
"test environment": prop["test_env"],
"reset": prop["reset"],
"sub module": self.module_map[prop["module"]]['sub module'],
"summary": name,
"multi_device": prop["multi_device"],
"multi_stage": prop["multi_stage"],
"timeout": int(prop["timeout"]),
"tags": tags})
return test_case
def dump_test_cases(self, test_cases):
"""
dump parsed test cases to YAML file for test bench input
:param test_cases: parsed test cases
"""
with open(os.path.join(self.idf_path, self.TEST_CASE_FILE), "w+") as f:
yaml.dump({"test cases": test_cases}, f, allow_unicode=True, default_flow_style=False)
def copy_module_def_file(self):
""" copy module def file to artifact path """
src = os.path.join(self.idf_path, self.MODULE_DEF_FILE)
dst = os.path.join(self.idf_path, self.MODULE_ARTIFACT_FILE)
shutil.copy(src, dst)
def parse_test_cases(self):
""" parse test cases from multiple built unit test apps """
test_cases = []
output_folder = os.path.join(self.idf_path, self.UT_BIN_FOLDER)
test_configs = os.listdir(output_folder)
for config in test_configs:
config_output_folder = os.path.join(output_folder, config)
if os.path.exists(config_output_folder):
test_cases.extend(self.parse_test_cases_for_one_config(config_output_folder, config))
test_cases.sort(key=lambda x: x["config"] + x["summary"])
self.dump_test_cases(test_cases)
def test_parser():
parser = Parser()
# test parsing tags
# parsing module only and module in module list
prop = parser.parse_case_properities("[esp32]")
assert prop["module"] == "esp32"
# module not in module list
prop = parser.parse_case_properities("[not_in_list]")
assert prop["module"] == "misc"
# parsing a default tag, a tag with assigned value
prop = parser.parse_case_properities("[esp32][ignore][test_env=ABCD][not_support1][not_support2=ABCD]")
assert prop["ignore"] == "Yes" and prop["test_env"] == "ABCD" \
and "not_support1" not in prop and "not_supported2" not in prop
# parsing omitted value
prop = parser.parse_case_properities("[esp32]")
assert prop["ignore"] == "No" and prop["test_env"] == "UT_T1_1"
# parsing with incorrect format
try:
parser.parse_case_properities("abcd")
assert False
except AssertionError:
pass
# skip invalid data parse, [type=] assigns empty string to type
prop = parser.parse_case_properities("[esp32]abdc aaaa [ignore=]")
assert prop["module"] == "esp32" and prop["ignore"] == ""
# skip mis-paired []
prop = parser.parse_case_properities("[esp32][[ignore=b]][]][test_env=AAA]]")
assert prop["module"] == "esp32" and prop["ignore"] == "b" and prop["test_env"] == "AAA"
config_dependency = {
'a': '123',
'b': '456',
'c': 'not {123}',
'd': '{123} and not {456}',
'e': '{123} and not {789}',
'f': '({123} and {456}) or ({123} and {789})'
}
sdkconfig = ["123", "789"]
tags = parser.parse_tags_internal(sdkconfig, config_dependency, parser.CONFIG_PATTERN)
assert sorted(tags) == ['a', 'd', 'f'] # sorted is required for older Python3, e.g. 3.4.8
def main():
test_parser()
idf_path = os.getenv("IDF_PATH")
parser = Parser(idf_path)
parser.parse_test_cases()
parser.copy_module_def_file()
if len(parser.parsing_errors) > 0:
for error in parser.parsing_errors:
print(error)
exit(-1)
if __name__ == '__main__':
main()

View File

@ -1,748 +0,0 @@
#!/usr/bin/env python
#
# Copyright 2018 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Test script for unit test case.
"""
import re
import os
import sys
import time
import argparse
import threading
# if we want to run test case outside `tiny-test-fw` folder,
# we need to insert tiny-test-fw path into sys path
test_fw_path = os.getenv("TEST_FW_PATH")
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0, test_fw_path)
import TinyFW
import IDF
import Utility
import Env
from DUT import ExpectTimeout
from IDF.IDFApp import UT
UT_APP_BOOT_UP_DONE = "Press ENTER to see the list of tests."
RESET_PATTERN = re.compile(r"(ets [\w]{3}\s+[\d]{1,2} [\d]{4} [\d]{2}:[\d]{2}:[\d]{2}[^()]*\([\w].*?\))")
EXCEPTION_PATTERN = re.compile(r"(Guru Meditation Error: Core\s+\d panic'ed \([\w].*?\))")
ABORT_PATTERN = re.compile(r"(abort\(\) was called at PC 0x[a-fA-F\d]{8} on core \d)")
FINISH_PATTERN = re.compile(r"1 Tests (\d) Failures (\d) Ignored")
END_LIST_STR = r'\r?\nEnter test for running'
TEST_PATTERN = re.compile(r'\((\d+)\)\s+"([^"]+)" ([^\r\n]+)\r?\n(' + END_LIST_STR + r')?')
TEST_SUBMENU_PATTERN = re.compile(r'\s+\((\d+)\)\s+"[^"]+"\r?\n(?=(?=\()|(' + END_LIST_STR + r'))')
SIMPLE_TEST_ID = 0
MULTI_STAGE_ID = 1
MULTI_DEVICE_ID = 2
DEFAULT_TIMEOUT = 20
DUT_STARTUP_CHECK_RETRY_COUNT = 5
TEST_HISTROY_CHECK_TIMEOUT = 1
def format_test_case_config(test_case_data):
"""
convert the test case data to unified format.
We need to following info to run unit test cases:
1. unit test app config
2. test case name
3. test case reset info
the formatted case config is a dict, with ut app config as keys. The value is a list of test cases.
Each test case is a dict with "name" and "reset" as keys. For example::
case_config = {
"default": [{"name": "restart from PRO CPU", "reset": "SW_CPU_RESET"}, {...}],
"psram": [{"name": "restart from PRO CPU", "reset": "SW_CPU_RESET"}],
}
If config is not specified for test case, then
:param test_case_data: string, list, or a dictionary list
:return: formatted data
"""
case_config = dict()
def parse_case(one_case_data):
""" parse and format one case """
def process_reset_list(reset_list):
# strip space and remove white space only items
_output = list()
for _r in reset_list:
_data = _r.strip(" ")
if _data:
_output.append(_data)
return _output
_case = dict()
if isinstance(one_case_data, str):
_temp = one_case_data.split(" [reset=")
_case["name"] = _temp[0]
try:
_case["reset"] = process_reset_list(_temp[1][0:-1].split(","))
except IndexError:
_case["reset"] = list()
elif isinstance(one_case_data, dict):
_case = one_case_data.copy()
assert "name" in _case
if "reset" not in _case:
_case["reset"] = list()
else:
if isinstance(_case["reset"], str):
_case["reset"] = process_reset_list(_case["reset"].split(","))
else:
raise TypeError("Not supported type during parsing unit test case")
if "config" not in _case:
_case["config"] = "default"
return _case
if not isinstance(test_case_data, list):
test_case_data = [test_case_data]
for case_data in test_case_data:
parsed_case = parse_case(case_data)
try:
case_config[parsed_case["config"]].append(parsed_case)
except KeyError:
case_config[parsed_case["config"]] = [parsed_case]
return case_config
def replace_app_bin(dut, name, new_app_bin):
if new_app_bin is None:
return
search_pattern = '/{}.bin'.format(name)
for i, config in enumerate(dut.download_config):
if config.endswith(search_pattern):
dut.download_config[i] = new_app_bin
Utility.console_log("The replaced application binary is {}".format(new_app_bin), "O")
break
def reset_dut(dut):
dut.reset()
# esptool ``run`` cmd takes quite long time.
# before reset finish, serial port is closed. therefore DUT could already bootup before serial port opened.
# this could cause checking bootup print failed.
# now use input cmd `-` and check test history to check if DUT is bootup.
# we'll retry this step for a few times,
# in case `dut.reset` returns during DUT bootup (when DUT can't process any command).
for _ in range(DUT_STARTUP_CHECK_RETRY_COUNT):
dut.write("-")
try:
dut.expect("0 Tests 0 Failures 0 Ignored", timeout=TEST_HISTROY_CHECK_TIMEOUT)
break
except ExpectTimeout:
pass
else:
raise AssertionError("Reset {} ({}) failed!".format(dut.name, dut.port))
def run_one_normal_case(dut, one_case, junit_test_case, failed_cases):
reset_dut(dut)
dut.start_capture_raw_data()
# run test case
dut.write("\"{}\"".format(one_case["name"]))
dut.expect("Running " + one_case["name"] + "...")
exception_reset_list = []
# we want to set this flag in callbacks (inner functions)
# use list here so we can use append to set this flag
test_finish = list()
# expect callbacks
def one_case_finish(result):
""" one test finished, let expect loop break and log result """
test_finish.append(True)
output = dut.stop_capture_raw_data()
if result:
Utility.console_log("Success: " + one_case["name"], color="green")
else:
failed_cases.append(one_case["name"])
Utility.console_log("Failed: " + one_case["name"], color="red")
junit_test_case.add_failure_info(output)
def handle_exception_reset(data):
"""
just append data to exception list.
exception list will be checked in ``handle_reset_finish``, once reset finished.
"""
exception_reset_list.append(data[0])
def handle_test_finish(data):
""" test finished without reset """
# in this scenario reset should not happen
assert not exception_reset_list
if int(data[1]):
# case ignored
Utility.console_log("Ignored: " + one_case["name"], color="orange")
junit_test_case.add_skipped_info("ignored")
one_case_finish(not int(data[0]))
def handle_reset_finish(data):
""" reset happened and reboot finished """
assert exception_reset_list # reboot but no exception/reset logged. should never happen
result = False
if len(one_case["reset"]) == len(exception_reset_list):
for i, exception in enumerate(exception_reset_list):
if one_case["reset"][i] not in exception:
break
else:
result = True
if not result:
err_msg = "Reset Check Failed: \r\n\tExpected: {}\r\n\tGet: {}".format(one_case["reset"],
exception_reset_list)
Utility.console_log(err_msg, color="orange")
junit_test_case.add_error_info(err_msg)
one_case_finish(result)
while not test_finish:
try:
dut.expect_any((RESET_PATTERN, handle_exception_reset),
(EXCEPTION_PATTERN, handle_exception_reset),
(ABORT_PATTERN, handle_exception_reset),
(FINISH_PATTERN, handle_test_finish),
(UT_APP_BOOT_UP_DONE, handle_reset_finish),
timeout=one_case["timeout"])
except ExpectTimeout:
Utility.console_log("Timeout in expect", color="orange")
junit_test_case.add_error_info("timeout")
one_case_finish(False)
break
@IDF.idf_unit_test(env_tag="UT_T1_1", junit_report_by_case=True)
def run_unit_test_cases(env, extra_data):
"""
extra_data can be three types of value
1. as string:
1. "case_name"
2. "case_name [reset=RESET_REASON]"
2. as dict:
1. with key like {"name": "Intr_alloc test, shared ints"}
2. with key like {"name": "restart from PRO CPU", "reset": "SW_CPU_RESET", "config": "psram"}
3. as list of string or dict:
[case1, case2, case3, {"name": "restart from PRO CPU", "reset": "SW_CPU_RESET"}, ...]
:param env: test env instance
:param extra_data: the case name or case list or case dictionary
:return: None
"""
case_config = format_test_case_config(extra_data)
# we don't want stop on failed case (unless some special scenarios we can't handle)
# this flag is used to log if any of the case failed during executing
# Before exit test function this flag is used to log if the case fails
failed_cases = []
for ut_config in case_config:
Utility.console_log("Running unit test for config: " + ut_config, "O")
dut = env.get_dut("unit-test-app", app_path=ut_config)
if len(case_config[ut_config]) > 0:
replace_app_bin(dut, "unit-test-app", case_config[ut_config][0].get('app_bin'))
dut.start_app()
Utility.console_log("Download finished, start running test cases", "O")
for one_case in case_config[ut_config]:
# create junit report test case
junit_test_case = TinyFW.JunitReport.create_test_case("[{}] {}".format(ut_config, one_case["name"]))
try:
run_one_normal_case(dut, one_case, junit_test_case, failed_cases)
TinyFW.JunitReport.test_case_finish(junit_test_case)
except Exception as e:
junit_test_case.add_error_info("Unexpected exception: " + str(e))
TinyFW.JunitReport.test_case_finish(junit_test_case)
# raise exception if any case fails
if failed_cases:
Utility.console_log("Failed Cases:", color="red")
for _case_name in failed_cases:
Utility.console_log("\t" + _case_name, color="red")
raise AssertionError("Unit Test Failed")
class Handler(threading.Thread):
WAIT_SIGNAL_PATTERN = re.compile(r'Waiting for signal: \[(.+)\]!')
SEND_SIGNAL_PATTERN = re.compile(r'Send signal: \[(.+)\]!')
FINISH_PATTERN = re.compile(r"1 Tests (\d) Failures (\d) Ignored")
def __init__(self, dut, sent_signal_list, lock, parent_case_name, child_case_index, timeout):
self.dut = dut
self.sent_signal_list = sent_signal_list
self.lock = lock
self.parent_case_name = parent_case_name
self.child_case_name = ""
self.child_case_index = child_case_index + 1
self.finish = False
self.result = False
self.output = ""
self.fail_name = None
self.timeout = timeout
self.force_stop = threading.Event() # it show the running status
reset_dut(self.dut) # reset the board to make it start from begining
threading.Thread.__init__(self, name="{} Handler".format(dut))
def run(self):
self.dut.start_capture_raw_data()
def get_child_case_name(data):
self.child_case_name = data[0]
time.sleep(1)
self.dut.write(str(self.child_case_index))
def one_device_case_finish(result):
""" one test finished, let expect loop break and log result """
self.finish = True
self.result = result
self.output = "[{}]\n\n{}\n".format(self.child_case_name,
self.dut.stop_capture_raw_data())
if not result:
self.fail_name = self.child_case_name
def device_wait_action(data):
start_time = time.time()
expected_signal = data[0]
while 1:
if time.time() > start_time + self.timeout:
Utility.console_log("Timeout in device for function: %s" % self.child_case_name, color="orange")
break
with self.lock:
if expected_signal in self.sent_signal_list:
self.dut.write(" ")
self.sent_signal_list.remove(expected_signal)
break
time.sleep(0.01)
def device_send_action(data):
with self.lock:
self.sent_signal_list.append(data[0].encode('utf-8'))
def handle_device_test_finish(data):
""" test finished without reset """
# in this scenario reset should not happen
if int(data[1]):
# case ignored
Utility.console_log("Ignored: " + self.child_case_name, color="orange")
one_device_case_finish(not int(data[0]))
try:
time.sleep(1)
self.dut.write("\"{}\"".format(self.parent_case_name))
self.dut.expect("Running " + self.parent_case_name + "...")
except ExpectTimeout:
Utility.console_log("No case detected!", color="orange")
while not self.finish and not self.force_stop.isSet():
try:
self.dut.expect_any((re.compile('\(' + str(self.child_case_index) + '\)\s"(\w+)"'),
get_child_case_name),
(self.WAIT_SIGNAL_PATTERN, device_wait_action), # wait signal pattern
(self.SEND_SIGNAL_PATTERN, device_send_action), # send signal pattern
(self.FINISH_PATTERN, handle_device_test_finish), # test finish pattern
timeout=self.timeout)
except ExpectTimeout:
Utility.console_log("Timeout in expect", color="orange")
one_device_case_finish(False)
break
def stop(self):
self.force_stop.set()
def get_case_info(one_case):
parent_case = one_case["name"]
child_case_num = one_case["child case num"]
return parent_case, child_case_num
def get_dut(duts, env, name, ut_config, app_bin=None):
if name in duts:
dut = duts[name]
else:
dut = env.get_dut(name, app_path=ut_config)
duts[name] = dut
replace_app_bin(dut, "unit-test-app", app_bin)
dut.start_app() # download bin to board
return dut
def run_one_multiple_devices_case(duts, ut_config, env, one_case, failed_cases, app_bin, junit_test_case):
lock = threading.RLock()
threads = []
send_signal_list = []
result = True
parent_case, case_num = get_case_info(one_case)
for i in range(case_num):
dut = get_dut(duts, env, "dut%d" % i, ut_config, app_bin)
threads.append(Handler(dut, send_signal_list, lock,
parent_case, i, one_case["timeout"]))
for thread in threads:
thread.setDaemon(True)
thread.start()
output = "Multiple Device Failed\n"
for thread in threads:
thread.join()
result = result and thread.result
output += thread.output
if not thread.result:
[thd.stop() for thd in threads]
if result:
Utility.console_log("Success: " + one_case["name"], color="green")
else:
failed_cases.append(one_case["name"])
junit_test_case.add_failure_info(output)
Utility.console_log("Failed: " + one_case["name"], color="red")
@IDF.idf_unit_test(env_tag="UT_T2_1", junit_report_by_case=True)
def run_multiple_devices_cases(env, extra_data):
"""
extra_data can be two types of value
1. as dict:
e.g.
{"name": "gpio master/slave test example",
"child case num": 2,
"config": "release",
"env_tag": "UT_T2_1"}
2. as list dict:
e.g.
[{"name": "gpio master/slave test example1",
"child case num": 2,
"config": "release",
"env_tag": "UT_T2_1"},
{"name": "gpio master/slave test example2",
"child case num": 2,
"config": "release",
"env_tag": "UT_T2_1"}]
"""
failed_cases = []
case_config = format_test_case_config(extra_data)
duts = {}
for ut_config in case_config:
Utility.console_log("Running unit test for config: " + ut_config, "O")
for one_case in case_config[ut_config]:
junit_test_case = TinyFW.JunitReport.create_test_case("[{}] {}".format(ut_config, one_case["name"]))
try:
run_one_multiple_devices_case(duts, ut_config, env, one_case, failed_cases,
one_case.get('app_bin'), junit_test_case)
TinyFW.JunitReport.test_case_finish(junit_test_case)
except Exception as e:
junit_test_case.add_error_info("Unexpected exception: " + str(e))
TinyFW.JunitReport.test_case_finish(junit_test_case)
if failed_cases:
Utility.console_log("Failed Cases:", color="red")
for _case_name in failed_cases:
Utility.console_log("\t" + _case_name, color="red")
raise AssertionError("Unit Test Failed")
def run_one_multiple_stage_case(dut, one_case, failed_cases, junit_test_case):
reset_dut(dut)
dut.start_capture_raw_data()
exception_reset_list = []
for test_stage in range(one_case["child case num"]):
# select multi stage test case name
dut.write("\"{}\"".format(one_case["name"]))
dut.expect("Running " + one_case["name"] + "...")
# select test function for current stage
dut.write(str(test_stage + 1))
# we want to set this flag in callbacks (inner functions)
# use list here so we can use append to set this flag
stage_finish = list()
def last_stage():
return test_stage == one_case["child case num"] - 1
def check_reset():
if one_case["reset"]:
assert exception_reset_list # reboot but no exception/reset logged. should never happen
result = False
if len(one_case["reset"]) == len(exception_reset_list):
for i, exception in enumerate(exception_reset_list):
if one_case["reset"][i] not in exception:
break
else:
result = True
if not result:
err_msg = "Reset Check Failed: \r\n\tExpected: {}\r\n\tGet: {}".format(one_case["reset"],
exception_reset_list)
Utility.console_log(err_msg, color="orange")
junit_test_case.add_error_info(err_msg)
else:
# we allow omit reset in multi stage cases
result = True
return result
# expect callbacks
def one_case_finish(result):
""" one test finished, let expect loop break and log result """
# handle test finish
result = result and check_reset()
output = dut.stop_capture_raw_data()
if result:
Utility.console_log("Success: " + one_case["name"], color="green")
else:
failed_cases.append(one_case["name"])
Utility.console_log("Failed: " + one_case["name"], color="red")
junit_test_case.add_failure_info(output)
stage_finish.append("break")
def handle_exception_reset(data):
"""
just append data to exception list.
exception list will be checked in ``handle_reset_finish``, once reset finished.
"""
exception_reset_list.append(data[0])
def handle_test_finish(data):
""" test finished without reset """
# in this scenario reset should not happen
if int(data[1]):
# case ignored
Utility.console_log("Ignored: " + one_case["name"], color="orange")
junit_test_case.add_skipped_info("ignored")
# only passed in last stage will be regarded as real pass
if last_stage():
one_case_finish(not int(data[0]))
else:
Utility.console_log("test finished before enter last stage", color="orange")
one_case_finish(False)
def handle_next_stage(data):
""" reboot finished. we goto next stage """
if last_stage():
# already last stage, should never goto next stage
Utility.console_log("didn't finish at last stage", color="orange")
one_case_finish(False)
else:
stage_finish.append("continue")
while not stage_finish:
try:
dut.expect_any((RESET_PATTERN, handle_exception_reset),
(EXCEPTION_PATTERN, handle_exception_reset),
(ABORT_PATTERN, handle_exception_reset),
(FINISH_PATTERN, handle_test_finish),
(UT_APP_BOOT_UP_DONE, handle_next_stage),
timeout=one_case["timeout"])
except ExpectTimeout:
Utility.console_log("Timeout in expect", color="orange")
one_case_finish(False)
break
if stage_finish[0] == "break":
# test breaks on current stage
break
@IDF.idf_unit_test(env_tag="UT_T1_1", junit_report_by_case=True)
def run_multiple_stage_cases(env, extra_data):
"""
extra_data can be 2 types of value
1. as dict: Mandantory keys: "name" and "child case num", optional keys: "reset" and others
3. as list of string or dict:
[case1, case2, case3, {"name": "restart from PRO CPU", "child case num": 2}, ...]
:param env: test env instance
:param extra_data: the case name or case list or case dictionary
:return: None
"""
case_config = format_test_case_config(extra_data)
# we don't want stop on failed case (unless some special scenarios we can't handle)
# this flag is used to log if any of the case failed during executing
# Before exit test function this flag is used to log if the case fails
failed_cases = []
for ut_config in case_config:
Utility.console_log("Running unit test for config: " + ut_config, "O")
dut = env.get_dut("unit-test-app", app_path=ut_config)
if len(case_config[ut_config]) > 0:
replace_app_bin(dut, "unit-test-app", case_config[ut_config][0].get('app_bin'))
dut.start_app()
for one_case in case_config[ut_config]:
junit_test_case = TinyFW.JunitReport.create_test_case("[{}] {}".format(ut_config, one_case["name"]))
try:
run_one_multiple_stage_case(dut, one_case, failed_cases, junit_test_case)
TinyFW.JunitReport.test_case_finish(junit_test_case)
except Exception as e:
junit_test_case.add_error_info("Unexpected exception: " + str(e))
TinyFW.JunitReport.test_case_finish(junit_test_case)
# raise exception if any case fails
if failed_cases:
Utility.console_log("Failed Cases:", color="red")
for _case_name in failed_cases:
Utility.console_log("\t" + _case_name, color="red")
raise AssertionError("Unit Test Failed")
def detect_update_unit_test_info(env, extra_data, app_bin):
case_config = format_test_case_config(extra_data)
for ut_config in case_config:
dut = env.get_dut("unit-test-app", app_path=ut_config)
replace_app_bin(dut, "unit-test-app", app_bin)
dut.start_app()
reset_dut(dut)
# get the list of test cases
dut.write("")
dut.expect("Here's the test menu, pick your combo:", timeout=DEFAULT_TIMEOUT)
def find_update_dic(name, _t, _timeout, child_case_num=None):
for _case_data in extra_data:
if _case_data['name'] == name:
_case_data['type'] = _t
if 'timeout' not in _case_data:
_case_data['timeout'] = _timeout
if child_case_num:
_case_data['child case num'] = child_case_num
try:
while True:
data = dut.expect(TEST_PATTERN, timeout=DEFAULT_TIMEOUT)
test_case_name = data[1]
m = re.search(r'\[timeout=(\d+)\]', data[2])
if m:
timeout = int(m.group(1))
else:
timeout = 30
m = re.search(r'\[multi_stage\]', data[2])
if m:
test_case_type = MULTI_STAGE_ID
else:
m = re.search(r'\[multi_device\]', data[2])
if m:
test_case_type = MULTI_DEVICE_ID
else:
test_case_type = SIMPLE_TEST_ID
find_update_dic(test_case_name, test_case_type, timeout)
if data[3] and re.search(END_LIST_STR, data[3]):
break
continue
# find the last submenu item
data = dut.expect(TEST_SUBMENU_PATTERN, timeout=DEFAULT_TIMEOUT)
find_update_dic(test_case_name, test_case_type, timeout, child_case_num=int(data[0]))
if data[1] and re.search(END_LIST_STR, data[1]):
break
# check if the unit test case names are correct, i.e. they could be found in the device
for _dic in extra_data:
if 'type' not in _dic:
raise ValueError("Unit test \"{}\" doesn't exist in the flashed device!".format(_dic.get('name')))
except ExpectTimeout:
Utility.console_log("Timeout during getting the test list", color="red")
finally:
dut.close()
# These options are the same for all configs, therefore there is no need to continue
break
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
'--repeat', '-r',
help='Number of repetitions for the test(s). Default is 1.',
type=int,
default=1
)
parser.add_argument("--env_config_file", "-e",
help="test env config file",
default=None
)
parser.add_argument("--app_bin", "-b",
help="application binary file for flashing the chip",
default=None
)
parser.add_argument(
'test',
help='Comma separated list of <option>:<argument> where option can be "name" (default), "child case num", \
"config", "timeout".',
nargs='+'
)
args = parser.parse_args()
list_of_dicts = []
for test in args.test:
test_args = test.split(r',')
test_dict = dict()
for test_item in test_args:
if len(test_item) == 0:
continue
pair = test_item.split(r':')
if len(pair) == 1 or pair[0] is 'name':
test_dict['name'] = pair[0]
elif len(pair) == 2:
if pair[0] == 'timeout' or pair[0] == 'child case num':
test_dict[pair[0]] = int(pair[1])
else:
test_dict[pair[0]] = pair[1]
else:
raise ValueError('Error in argument item {} of {}'.format(test_item, test))
test_dict['app_bin'] = args.app_bin
list_of_dicts.append(test_dict)
TinyFW.set_default_config(env_config_file=args.env_config_file)
env_config = TinyFW.get_default_config()
env_config['app'] = UT
env_config['dut'] = IDF.IDFDUT
env_config['test_suite_name'] = 'unit_test_parsing'
test_env = Env.Env(**env_config)
detect_update_unit_test_info(test_env, extra_data=list_of_dicts, app_bin=args.app_bin)
for index in range(1, args.repeat+1):
if args.repeat > 1:
Utility.console_log("Repetition {}".format(index), color="green")
for dic in list_of_dicts:
t = dic.get('type', SIMPLE_TEST_ID)
if t == SIMPLE_TEST_ID:
run_unit_test_cases(extra_data=dic)
elif t == MULTI_STAGE_ID:
run_multiple_stage_cases(extra_data=dic)
elif t == MULTI_DEVICE_ID:
run_multiple_devices_cases(extra_data=dic)
else:
raise ValueError('Unknown type {} of {}'.format(t, dic.get('name')))