[otci] update to support more commands (#11015)

* otci now supports all commands the Thread Test Harness requires
* improve typehinting across the board
* fixes some typos
This commit is contained in:
Thomas
2025-03-19 04:53:22 +01:00
committed by GitHub
parent 21ba5bbea8
commit ba6a803ed5
10 changed files with 649 additions and 228 deletions

View File

@ -144,8 +144,8 @@ def check_tlv_request_tlv(command_msg, check_type, tlv_id):
elif check_type == CheckType.NOT_CONTAIN:
if tlv_request_tlv is not None:
assert (any(tlv_id == tlv for tlv in tlv_request_tlv.tlvs) is
False), "Error: The msg contains TLV Request TLV ID: {}".format(tlv_id)
assert (not any(tlv_id == tlv
for tlv in tlv_request_tlv.tlvs)), f"Error: The msg contains TLV Request TLV ID: {tlv_id}"
elif check_type == CheckType.OPTIONAL:
if tlv_request_tlv is not None:

View File

@ -31,14 +31,15 @@ from . import errors
from .constants import THREAD_VERSION_1_1, THREAD_VERSION_1_2
from .command_handlers import OTCommandHandler
from .otci import OTCI
from .otci import \
connect_cli_sim, \
connect_cli_serial, \
connect_ncp_sim, \
connect_cmd_handler, \
connect_otbr_ssh, \
connect_otbr_adb_tcp, \
connect_otbr_adb_usb
from .otci import (
connect_cli_sim,
connect_cli_serial,
connect_ncp_sim,
connect_cmd_handler,
connect_otbr_ssh,
connect_otbr_adb_tcp,
connect_otbr_adb_usb,
)
from .types import Rloc16, ChildId, NetifIdentifier
@ -61,4 +62,6 @@ __all__ = [
'NetifIdentifier',
'THREAD_VERSION_1_1',
'THREAD_VERSION_1_2',
'THREAD_VERSION_1_3',
'THREAD_VERSION_1_4',
] + _connectors

View File

@ -105,13 +105,13 @@ class OtCliCommandRunner(OTCommandHandler):
__ASYNC_COMMANDS = {'scan', 'ping', 'discover'}
def __init__(self, otcli: OtCliHandler, is_spinel_cli=False):
def __init__(self, otcli: OtCliHandler, is_spinel_cli: bool = False):
self.__otcli: OtCliHandler = otcli
self.__is_spinel_cli = is_spinel_cli
self.__expect_command_echoback = not self.__is_spinel_cli
self.__line_read_callback = None
self.__pending_lines = queue.Queue()
self.__pending_lines: queue.Queue[str] = queue.Queue()
self.__should_close = threading.Event()
self.__otcli_reader = threading.Thread(target=self.__otcli_read_routine, daemon=True)
self.__otcli_reader.start()
@ -119,7 +119,7 @@ class OtCliCommandRunner(OTCommandHandler):
def __repr__(self):
return repr(self.__otcli)
def execute_command(self, cmd, timeout=10) -> List[str]:
def execute_command(self, cmd: str, timeout: float = 10) -> List[str]:
assert not self.__should_close.is_set(), "OT CLI is already closed."
self.__otcli.writeline(cmd)
@ -137,13 +137,13 @@ class OtCliCommandRunner(OTCommandHandler):
asynchronous=cmd.split()[0] in OtCliCommandRunner.__ASYNC_COMMANDS)
return output
def execute_platform_command(self, cmd, timeout=10) -> List[str]:
def execute_platform_command(self, cmd: str, timeout: float = 10) -> List[str]:
raise NotImplementedError(f'Platform command is not supported on {self.__class__.__name__}')
def wait(self, duration: float) -> List[str]:
self.__otcli.wait(duration)
output = []
output: List[str] = []
try:
while True:
line = self.__pending_lines.get_nowait()
@ -166,8 +166,11 @@ class OtCliCommandRunner(OTCommandHandler):
# Private methods
#
def __expect_line(self, timeout: float, expect_line: Union[str, Pattern], asynchronous=False) -> List[str]:
output = []
def __expect_line(self,
timeout: float,
expect_line: Union[str, Pattern[str]],
asynchronous: bool = False) -> List[str]:
output: List[str] = []
if not asynchronous:
while True:
@ -222,12 +225,13 @@ class OtCliCommandRunner(OTCommandHandler):
logging.debug('%s: %s', self.__otcli, line)
if not OtCliCommandRunner.__PATTERN_LOG_LINE.match(line):
logging.info('%s: %s', self.__otcli, line)
self.__pending_lines.put(line)
class OtbrSshCommandRunner(OTCommandHandler):
def __init__(self, host, port, username, password, sudo):
def __init__(self, host: str, port: int, username: str, password: str, sudo: bool):
import paramiko
self.__host = host
@ -272,16 +276,16 @@ class OtbrSshCommandRunner(OTCommandHandler):
return output
def execute_platform_command(self, cmd, timeout=10) -> List[str]:
def execute_platform_command(self, cmd: str, timeout: float = 10) -> List[str]:
if self.__sudo:
cmd = 'sudo ' + cmd
return self.shell(cmd, timeout=timeout)
def shell(self, cmd: str, timeout: float) -> List[str]:
cmd_in, cmd_out, cmd_err = self.__ssh.exec_command(cmd, timeout=int(timeout), bufsize=1024)
errput = [l.rstrip('\r\n') for l in cmd_err.readlines()]
output = [l.rstrip('\r\n') for l in cmd_out.readlines()]
_, cmd_out, cmd_err = self.__ssh.exec_command(cmd, timeout=int(timeout), bufsize=1024)
errput = [line.rstrip('\r\n') for line in cmd_err.readlines()]
output = [line.rstrip('\r\n') for line in cmd_out.readlines()]
if errput:
raise CommandError(cmd, errput)

View File

@ -30,7 +30,7 @@ import logging
import subprocess
import time
from abc import abstractmethod, ABC
from typing import Optional
from typing import Any, Optional
class OtCliHandler(ABC):
@ -72,7 +72,7 @@ class Simulator(ABC):
class OtCliPopen(OtCliHandler):
"""Connector for OT CLI process (a Popen instance)."""
def __init__(self, proc: subprocess.Popen, nodeid: int, simulator: Simulator):
def __init__(self, proc: subprocess.Popen[Any], nodeid: int, simulator: Optional[Simulator]):
self.__otcli_proc = proc
self.__nodeid = nodeid
self.__simulator = simulator
@ -108,7 +108,7 @@ class OtCliPopen(OtCliHandler):
class OtCliSim(OtCliPopen):
"""Connector for OT CLI Simulation instances."""
def __init__(self, executable: str, nodeid: int, simulator: Simulator):
def __init__(self, executable: str, nodeid: int, simulator: Optional[Simulator]):
logging.info('%s: executable=%s', self.__class__.__name__, executable)
proc = subprocess.Popen(args=[executable, str(nodeid)],
@ -123,7 +123,7 @@ class OtCliSim(OtCliPopen):
class OtNcpSim(OtCliPopen):
"""Connector for OT NCP Simulation instances."""
def __init__(self, executable: str, nodeid: int, simulator: Simulator):
def __init__(self, executable: str, nodeid: int, simulator: Optional[Simulator]):
logging.info('%s: executable=%s', self.__class__.__name__, executable)
proc = subprocess.Popen(args=f'spinel-cli.py -p "{executable}" -n {nodeid} 2>&1',
@ -144,6 +144,7 @@ class OtCliSerial(OtCliHandler):
import serial
self.__serial = serial.Serial(self.__dev, self.__baudrate, timeout=0.1, exclusive=True)
self.writeline('\r\n')
self.__linebuffer = b''
def __repr__(self):
@ -164,7 +165,7 @@ class OtCliSerial(OtCliHandler):
return None
def writeline(self, s: str):
self.__serial.write((s + '\n').encode('utf-8'))
self.__serial.write((s + '\r\n').encode('utf-8'))
def wait(self, duration: float):
time.sleep(duration)

View File

@ -30,3 +30,5 @@
# Thread versions
THREAD_VERSION_1_1 = 2
THREAD_VERSION_1_2 = 3
THREAD_VERSION_1_3 = 4
THREAD_VERSION_1_4 = 5

View File

@ -26,7 +26,9 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
from typing import List
import re
from typing import Collection, List, Pattern, Union
class OTCIError(Exception):
@ -37,15 +39,26 @@ class OTCIError(Exception):
class ExpectLineTimeoutError(OTCIError):
"""OTCI failed to find an expected line before timeout."""
def __init__(self, line):
def __init__(self, line: Union[str, Pattern[str], Collection[str]]):
super(ExpectLineTimeoutError, self).__init__("Expected line %r, but timed out" % line)
class CommandError(OTCIError):
"""OTCI failed to execute a command."""
__COMMAND_OUTPUT_ERROR_PATTERN = re.compile(r'Error (\d+): (.*)')
def __init__(self, cmd: str, output: List[str]):
self.__output = output
for line in output:
m = self.__COMMAND_OUTPUT_ERROR_PATTERN.match(line)
if not m:
continue
code, msg = m.groups()
self.code, self.msg = int(code), str(msg)
break
super(CommandError, self).__init__("Command error while executing %r:\n%s\n" % (cmd, '\n'.join(output)))
def error(self) -> str:

File diff suppressed because it is too large Load Diff

View File

@ -56,7 +56,7 @@ class PartitionId(int):
class NetifIdentifier(IntEnum):
"""Represents a network interface identifier."""
UNSPECIFIED = 0
THERAD = 1
THREAD = 1
BACKBONE = 2

View File

@ -27,10 +27,10 @@
# POSSIBILITY OF SUCH DAMAGE.
#
import functools
from typing import Union, Collection, Any, Pattern
from typing import Any, Callable, Collection, Generator, Pattern, Union
def match_line(line: str, expect_line: Union[str, Pattern, Collection[Any]]) -> bool:
def match_line(line: str, expect_line: Union[str, Pattern[Any], Collection[Any]]) -> bool:
"""Checks if a line is expected (matched by one of the given patterns)."""
if isinstance(expect_line, Pattern):
match = expect_line.match(line) is not None
@ -42,12 +42,12 @@ def match_line(line: str, expect_line: Union[str, Pattern, Collection[Any]]) ->
return match
def cached(func):
def cached(func: Callable[[Any], Any]):
"""Decorator cached makes the function to cache its result and return it in duplicate calls."""
prop_name = '__cached_' + func.__name__
prop_name = str('__cached_' + func.__name__)
@functools.wraps(func)
def _cached_func(self):
def _cached_func(self: Any):
try:
return getattr(self, prop_name)
except AttributeError:
@ -58,6 +58,16 @@ def cached(func):
return _cached_func
def constant_property(func):
def constant_property(func: Callable[[Any], Any]) -> property:
"""A constant property is a property that only evaluated once."""
return property(cached(func))
def bits_set(number: int) -> Generator[int, int, None]:
"""Find all occurrences of a pattern in a string."""
idx = 0
while number != 0:
if number & 1:
yield idx
else:
number >>= 1

View File

@ -33,6 +33,8 @@ import os
import subprocess
import unittest
from typing import cast, Dict
import otci
from otci import OTCI
from otci.errors import CommandError
@ -60,9 +62,9 @@ class TestOTCI(unittest.TestCase):
self.skipTest('not for virtual device')
if os.getenv('OTBR_SSH'):
node = otci.connect_otbr_ssh(os.getenv('OTBR_SSH'))
node = otci.connect_otbr_ssh(os.getenv('OTBR_SSH', ''))
elif os.getenv('OT_CLI_SERIAL'):
node = otci.connect_cli_serial(os.getenv('OT_CLI_SERIAL'))
node = otci.connect_cli_serial(os.getenv('OT_CLI_SERIAL', ''))
else:
self.fail("Please set OT_CLI_SERIAL or OTBR_SSH to test the real device.")
@ -85,10 +87,10 @@ class TestOTCI(unittest.TestCase):
sim = None
if os.getenv('OT_CLI'):
executable = os.getenv('OT_CLI')
executable = os.getenv('OT_CLI', '')
connector = otci.connect_cli_sim
elif os.getenv('OT_NCP'):
executable = os.getenv('OT_NCP')
executable = os.getenv('OT_NCP', '')
connector = otci.connect_ncp_sim
else:
self.fail("Please set OT_CLI to test virtual device")
@ -109,7 +111,7 @@ class TestOTCI(unittest.TestCase):
self._test_otci_multi_nodes(node1, node2, node3, node4)
def _test_otci_single_node(self, leader):
def _test_otci_single_node(self, leader: OTCI):
logging.info('leader version: %r', leader.version)
logging.info('leader thread version: %r', leader.thread_version)
logging.info('API version: %r', leader.api_version)
@ -305,7 +307,7 @@ class TestOTCI(unittest.TestCase):
leader.wait(1)
leader.coap_stop()
for netif in (NetifIdentifier.THERAD, NetifIdentifier.UNSPECIFIED, NetifIdentifier.BACKBONE):
for netif in (NetifIdentifier.THREAD, NetifIdentifier.UNSPECIFIED, NetifIdentifier.BACKBONE):
leader.udp_open()
leader.udp_bind("::", 1234, netif=netif)
leader.udp_send(leader.get_ipaddr_rloc(), 1234, text='hello')
@ -507,7 +509,7 @@ class TestOTCI(unittest.TestCase):
self.assertEqual([], server.srp_server_get_hosts())
self.assertEqual([], server.srp_server_get_services())
def _test_otci_example(self, node1, node2):
def _test_otci_example(self, node1: OTCI, node2: OTCI):
node1.dataset_init_buffer()
node1.dataset_set_buffer(network_name='test',
network_key='00112233445566778899aabbccddeeff',
@ -534,7 +536,7 @@ class TestOTCI(unittest.TestCase):
node2.wait(10)
assert node2.get_state() == "router"
def _test_otci_multi_nodes(self, leader, commissioner, child1, child2):
def _test_otci_multi_nodes(self, leader: OTCI, commissioner: OTCI, child1: OTCI, child2: OTCI):
self.assertFalse(leader.get_ifconfig_state())
# ifconfig up
@ -589,15 +591,15 @@ class TestOTCI(unittest.TestCase):
statistics = commissioner.ping(dst_ip, size=10, count=10, interval=2, hoplimit=3)
self.assertEqual(statistics['transmitted_packets'], 10)
self.assertEqual(statistics['received_packets'], 10)
self.assertAlmostEqual(statistics['packet_loss'], 0.0, delta=1e-9)
rtt = statistics['round_trip_time']
self.assertAlmostEqual(cast(float, statistics['packet_loss']), 0.0, delta=1e-9)
rtt: Dict[str, float] = cast(Dict[str, float], statistics['round_trip_time'])
self.assertTrue(rtt['min'] - 1e-9 <= rtt['avg'] <= rtt['max'] + 1e-9)
commissioner.wait(1)
self.assertEqual('disabled', commissioner.get_commissioiner_state())
self.assertEqual('disabled', commissioner.get_commissioner_state())
commissioner.commissioner_start()
commissioner.wait(5)
self.assertEqual('active', commissioner.get_commissioiner_state())
self.assertEqual('active', commissioner.get_commissioner_state())
logging.info('commissioner.get_network_id_timeout() = %d', commissioner.get_network_id_timeout())
commissioner.set_network_id_timeout(60)
@ -706,7 +708,7 @@ class TestOTCI(unittest.TestCase):
statistics = commissioner.ping("ff02::1", size=1, count=10, interval=1, hoplimit=255)
self.assertEqual(statistics['transmitted_packets'], 10)
self.assertEqual(statistics['received_packets'], 20)
rtt = statistics['round_trip_time']
rtt: Dict[str, float] = cast(Dict[str, float], statistics['round_trip_time'])
self.assertTrue(rtt['min'] - 1e-9 <= rtt['avg'] <= rtt['max'] + 1e-9)
# Shutdown
@ -718,7 +720,7 @@ class TestOTCI(unittest.TestCase):
leader.close()
def _setup_default_network(node):
def _setup_default_network(node: OTCI):
node.dataset_clear_buffer()
node.dataset_set_buffer(
active_timestamp=1,