Add type-checking to DAP requests

It occurred to me recently that gdb's DAP implementation should
probably check the types of objects coming from the client.  This
patch implements this idea by reusing Python's existing type
annotations, and supplying a decorator that verifies these at runtime.

Python doesn't make it very easy to do runtime type-checking, so the
core of the checker is written by hand.  I haven't tried to make a
fully generic runtime type checker.  Instead, this only checks the
subset that is needed by DAP.  For example, only keyword-only
functions are handled.

Furthermore, in a few spots, it wasn't convenient to spell out the
type that is accepted.  I've added a couple of comments to this effect
in breakpoint.py.

I've tried to make this code compatible with older versions of Python,
but I've only been able to try it with 3.9 and 3.10.
This commit is contained in:
Tom Tromey
2023-05-11 12:20:05 -06:00
parent 5e20b4cd17
commit 510586589e
13 changed files with 287 additions and 28 deletions

View File

@ -16,6 +16,9 @@
import gdb
import os
# These are deprecated in 3.9, but required in older versions.
from typing import Optional, Sequence
from .server import request, capability
from .startup import send_gdb_with_response, in_gdb_thread
@ -95,8 +98,10 @@ def _set_breakpoints(kind, specs):
return _set_breakpoints_callback(kind, specs, gdb.Breakpoint)
# FIXME we do not specify a type for 'source'.
# FIXME 'breakpoints' is really a list[SourceBreakpoint].
@request("setBreakpoints")
def set_breakpoint(*, source, breakpoints=(), **args):
def set_breakpoint(*, source, breakpoints: Sequence = (), **args):
if "path" not in source:
result = []
else:
@ -119,7 +124,7 @@ def set_breakpoint(*, source, breakpoints=(), **args):
@request("setFunctionBreakpoints")
@capability("supportsFunctionBreakpoints")
def set_fn_breakpoint(*, breakpoints, **args):
def set_fn_breakpoint(*, breakpoints: Sequence, **args):
specs = []
for bp in breakpoints:
specs.append(
@ -135,7 +140,9 @@ def set_fn_breakpoint(*, breakpoints, **args):
@request("setInstructionBreakpoints")
@capability("supportsInstructionBreakpoints")
def set_insn_breakpoints(*, breakpoints, offset=None, **args):
def set_insn_breakpoints(
*, breakpoints: Sequence, offset: Optional[int] = None, **args
):
specs = []
for bp in breakpoints:
# There's no way to set an explicit address breakpoint
@ -179,16 +186,24 @@ def _set_exception_catchpoints(filter_options):
@request("setExceptionBreakpoints")
@capability("supportsExceptionFilterOptions")
@capability("exceptionBreakpointFilters", ({
"filter": "assert",
"label": "Ada assertions",
"supportsCondition": True,
}, {
"filter": "exception",
"label": "Ada exceptions",
"supportsCondition": True,
}))
def set_exception_breakpoints(*, filters, filterOptions=(), **args):
@capability(
"exceptionBreakpointFilters",
(
{
"filter": "assert",
"label": "Ada assertions",
"supportsCondition": True,
},
{
"filter": "exception",
"label": "Ada exceptions",
"supportsCondition": True,
},
),
)
def set_exception_breakpoints(
*, filters: Sequence[str], filterOptions: Sequence = (), **args
):
# Convert the 'filters' to the filter-options style.
options = [{"filterId": filter} for filter in filters]
options.extend(filterOptions)

View File

@ -88,5 +88,5 @@ def _backtrace(thread_id, levels, startFrame):
@request("stackTrace")
@capability("supportsDelayedStackTraceLoading")
def stacktrace(*, levels=0, startFrame=0, threadId, **extra):
def stacktrace(*, levels: int = 0, startFrame: int = 0, threadId: int, **extra):
return send_gdb_with_response(lambda: _backtrace(threadId, levels, startFrame))

View File

@ -43,7 +43,12 @@ def _disassemble(pc, skip_insns, count):
@request("disassemble")
@capability("supportsDisassembleRequest")
def disassemble(
*, memoryReference, offset=0, instructionOffset=0, instructionCount, **extra
*,
memoryReference: str,
offset: int = 0,
instructionOffset: int = 0,
instructionCount: int,
**extra
):
pc = int(memoryReference, 0) + offset
return send_gdb_with_response(

View File

@ -16,6 +16,9 @@
import gdb
import gdb.printing
# This is deprecated in 3.9, but required in older versions.
from typing import Optional
from .frames import frame_for_id
from .server import request
from .startup import send_gdb_with_response, in_gdb_thread
@ -55,7 +58,13 @@ def _repl(command, frame_id):
# FIXME supportsVariableType handling
@request("evaluate")
def eval_request(*, expression, frameId=None, context="variables", **args):
def eval_request(
*,
expression: str,
frameId: Optional[int] = None,
context: str = "variables",
**args,
):
if context in ("watch", "variables"):
# These seem to be expression-like.
return send_gdb_with_response(lambda: _evaluate(expression, frameId))
@ -75,7 +84,7 @@ def _variables(ref, start, count):
@request("variables")
# Note that we ignore the 'filter' field. That seems to be
# specific to javascript.
def variables(*, variablesReference, start=0, count=0, **args):
def variables(*, variablesReference: int, start: int = 0, count: int = 0, **args):
result = send_gdb_with_response(
lambda: _variables(variablesReference, start, count)
)

View File

@ -14,6 +14,10 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import gdb
# These are deprecated in 3.9, but required in older versions.
from typing import Mapping, Optional, Sequence
from .events import ExecutionInvoker
from .server import request, capability
from .startup import send_gdb, send_gdb_with_response, in_gdb_thread
@ -36,7 +40,13 @@ def _set_args_env(args, env):
# from implementations. Any additions or changes here should be
# documented in the gdb manual.
@request("launch")
def launch(*, program=None, args=(), env=None, **extra):
def launch(
*,
program: Optional[str] = None,
args: Sequence[str] = (),
env: Optional[Mapping[str, str]] = None,
**extra,
):
if program is not None:
global _program
_program = program
@ -46,7 +56,7 @@ def launch(*, program=None, args=(), env=None, **extra):
@request("attach")
def attach(*, pid, **args):
def attach(*, pid: int, **args):
# Ensure configurationDone does not try to run.
global _program
_program = None

View File

@ -28,7 +28,7 @@ def _read_memory(addr, count):
@request("readMemory")
@capability("supportsReadMemoryRequest")
def read_memory(*, memoryReference, offset=0, count, **extra):
def read_memory(*, memoryReference: str, offset: int = 0, count: int, **extra):
addr = int(memoryReference, 0) + offset
buf = send_gdb_with_response(lambda: _read_memory(addr, count))
return {
@ -45,7 +45,7 @@ def _write_memory(addr, contents):
@request("writeMemory")
@capability("supportsWriteMemoryRequest")
def write_memory(*, memoryReference, offset=0, data, **extra):
def write_memory(*, memoryReference: str, offset: int = 0, data: str, **extra):
addr = int(memoryReference, 0) + offset
send_gdb_with_response(lambda: _write_memory(addr, data))
return {}

View File

@ -45,7 +45,9 @@ def _handle_thread_step(thread_id, single_thread):
@request("next")
def next(*, threadId, singleThread=False, granularity="statement", **args):
def next(
*, threadId: int, singleThread: bool = False, granularity: str = "statement", **args
):
send_gdb(lambda: _handle_thread_step(threadId, singleThread))
cmd = "next"
if granularity == "instruction":
@ -56,7 +58,9 @@ def next(*, threadId, singleThread=False, granularity="statement", **args):
@capability("supportsSteppingGranularity")
@capability("supportsSingleThreadExecutionRequests")
@request("stepIn")
def step_in(*, threadId, singleThread=False, granularity="statement", **args):
def step_in(
*, threadId: int, singleThread: bool = False, granularity: str = "statement", **args
):
send_gdb(lambda: _handle_thread_step(threadId, singleThread))
cmd = "step"
if granularity == "instruction":
@ -65,13 +69,13 @@ def step_in(*, threadId, singleThread=False, granularity="statement", **args):
@request("stepOut")
def step_out(*, threadId, singleThread=False):
def step_out(*, threadId: int, singleThread: bool = False):
send_gdb(lambda: _handle_thread_step(threadId, singleThread))
send_gdb(ExecutionInvoker("finish", StopKinds.STEP))
@request("continue")
def continue_request(*, threadId, singleThread=False, **args):
def continue_request(*, threadId: int, singleThread: bool = False, **args):
locked = send_gdb_with_response(lambda: _handle_thread_step(threadId, singleThread))
send_gdb(ExecutionInvoker("continue", None))
return {"allThreadsContinued": not locked}

View File

@ -109,6 +109,6 @@ def _get_scope(id):
@request("scopes")
def scopes(*, frameId, **extra):
def scopes(*, frameId: int, **extra):
scopes = send_gdb_with_response(lambda: _get_scope(frameId))
return {"scopes": scopes}

View File

@ -25,6 +25,7 @@ from .startup import (
log_stack,
send_gdb_with_response,
)
from .typecheck import type_check
# Map capability names to values.
@ -165,7 +166,8 @@ def request(name):
def wrap(func):
global _commands
# All requests must run in the DAP thread.
func = in_dap_thread(func)
# Also type-check the calls.
func = in_dap_thread(type_check(func))
_commands[name] = func
return func
@ -202,7 +204,7 @@ def terminate(**args):
@request("disconnect")
@capability("supportTerminateDebuggee")
def disconnect(*, terminateDebuggee=False, **args):
def disconnect(*, terminateDebuggee: bool = False, **args):
if terminateDebuggee:
terminate()
_server.shutdown()

View File

@ -0,0 +1,88 @@
# Copyright 2023 Free Software Foundation, Inc.
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# A simple runtime type checker.
import collections.abc
import functools
import typing
# 'isinstance' won't work in general for type variables, so we
# implement the subset that is needed by DAP.
def _check_instance(value, typevar):
base = typing.get_origin(typevar)
if base is None:
return isinstance(value, typevar)
arg_types = typing.get_args(typevar)
if base == collections.abc.Mapping or base == typing.Mapping:
if not isinstance(value, collections.abc.Mapping):
return False
assert len(arg_types) == 2
(keytype, valuetype) = arg_types
return all(
_check_instance(k, keytype) and _check_instance(v, valuetype)
for k, v in value.items()
)
elif base == collections.abc.Sequence or base == typing.Sequence:
# In some places we simply use 'Sequence' without arguments.
if not isinstance(value, base):
return False
if len(arg_types) == 0:
return True
assert len(arg_types) == 1
arg_type = arg_types[0]
return all(_check_instance(item, arg_type) for item in value)
elif base == typing.Union:
return any(_check_instance(value, arg_type) for arg_type in arg_types)
raise TypeError("unsupported type variable '" + str(typevar) + "'")
def type_check(func):
"""A decorator that checks FUNC's argument types at runtime."""
# The type checker relies on 'typing.get_origin', which was added
# in Python 3.8. (It also relies on 'typing.get_args', but that
# was added at the same time.)
if not hasattr(typing, "get_origin"):
return func
hints = typing.get_type_hints(func)
# We don't check the return type, but we allow it in case someone
# wants to use it on a function definition.
if "return" in hints:
del hints["return"]
# Note that keyword-only is fine for our purposes, because this is
# only used for DAP requests, and those are always called this
# way.
@functools.wraps(func)
def check_arguments(**kwargs):
for key in hints:
# The argument might not be passed in; we could type-check
# any default value here, but it seems fine to just rely
# on the code being correct -- the main goal of this
# checking is to verify JSON coming from the client.
if key in kwargs and not _check_instance(kwargs[key], hints[key]):
raise TypeError(
"value for '"
+ key
+ "' does not have expected type '"
+ str(hints[key])
+ "'"
)
return func(**kwargs)
return check_arguments