feat: configure header extraction for ASGI middleware via constructor params (#2026)

* feat: configure header extraction for ASGI middleware via constructor params

* fix django middleware

* lint

* remove import

* Fix lint

* Update instrumentation/opentelemetry-instrumentation-asgi/src/opentelemetry/instrumentation/asgi/__init__.py
This commit is contained in:
Adrian Garcia Badaracco
2024-01-31 00:11:00 -03:00
committed by GitHub
parent a93bd74dc3
commit 4b1a9c75db
7 changed files with 209 additions and 99 deletions

View File

@ -12,11 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
from os import environ
from re import IGNORECASE as RE_IGNORECASE
from re import compile as re_compile
from re import search
from typing import Iterable, List, Optional
from typing import Callable, Iterable, Optional
from urllib.parse import urlparse, urlunparse
from opentelemetry.semconv.trace import SpanAttributes
@ -84,9 +86,12 @@ class SanitizeValue:
)
def sanitize_header_values(
self, headers: dict, header_regexes: list, normalize_function: callable
) -> dict:
values = {}
self,
headers: dict[str, str],
header_regexes: list[str],
normalize_function: Callable[[str], str],
) -> dict[str, str]:
values: dict[str, str] = {}
if header_regexes:
header_regexes_compiled = re_compile(
@ -216,14 +221,14 @@ def sanitize_method(method: Optional[str]) -> Optional[str]:
return "UNKNOWN"
def get_custom_headers(env_var: str) -> List[str]:
custom_headers = environ.get(env_var, [])
def get_custom_headers(env_var: str) -> list[str]:
custom_headers = environ.get(env_var, None)
if custom_headers:
custom_headers = [
return [
custom_headers.strip()
for custom_headers in custom_headers.split(",")
]
return custom_headers
return []
def _parse_active_request_count_attrs(req_attrs):