Files
Alexander Zinov abfa9a1c47 Improve type hints (#1401)
* Add type parameters to `AuthenticationBackend`

* add more type-hints
2024-07-14 15:04:13 +02:00

233 lines
9.1 KiB
Python

import re
from inspect import Parameter, Signature
from typing import Any, Callable, Generic, List, Optional, Sequence, Tuple, cast
from fastapi import Depends, HTTPException, status
from makefun import with_signature
from fastapi_users import models
from fastapi_users.authentication.backend import AuthenticationBackend
from fastapi_users.authentication.strategy import Strategy
from fastapi_users.manager import BaseUserManager, UserManagerDependency
from fastapi_users.types import DependencyCallable
INVALID_CHARS_PATTERN = re.compile(r"[^0-9a-zA-Z_]")
INVALID_LEADING_CHARS_PATTERN = re.compile(r"^[^a-zA-Z_]+")
def name_to_variable_name(name: str) -> str:
"""Transform a backend name string into a string safe to use as variable name."""
name = re.sub(INVALID_CHARS_PATTERN, "", name)
name = re.sub(INVALID_LEADING_CHARS_PATTERN, "", name)
return name
def name_to_strategy_variable_name(name: str) -> str:
"""Transform a backend name string into a strategy variable name."""
return f"strategy_{name_to_variable_name(name)}"
class DuplicateBackendNamesError(Exception):
pass
EnabledBackendsDependency = DependencyCallable[Sequence[AuthenticationBackend[models.UP, models.ID]]]
class Authenticator(Generic[models.UP, models.ID]):
"""
Provides dependency callables to retrieve authenticated user.
It performs the authentication against a list of backends
defined by the end-developer. The first backend yielding a user wins.
If no backend yields a user, an HTTPException is raised.
:param backends: List of authentication backends.
:param get_user_manager: User manager dependency callable.
"""
backends: Sequence[AuthenticationBackend[models.UP, models.ID]]
def __init__(
self,
backends: Sequence[AuthenticationBackend[models.UP, models.ID]],
get_user_manager: UserManagerDependency[models.UP, models.ID],
):
self.backends = backends
self.get_user_manager = get_user_manager
def current_user_token(
self,
optional: bool = False,
active: bool = False,
verified: bool = False,
superuser: bool = False,
get_enabled_backends: Optional[EnabledBackendsDependency[models.UP, models.ID]] = None,
):
"""
Return a dependency callable to retrieve currently authenticated user and token.
:param optional: If `True`, `None` is returned if there is no authenticated user
or if it doesn't pass the other requirements.
Otherwise, throw `401 Unauthorized`. Defaults to `False`.
Otherwise, an exception is raised. Defaults to `False`.
:param active: If `True`, throw `401 Unauthorized` if
the authenticated user is inactive. Defaults to `False`.
:param verified: If `True`, throw `401 Unauthorized` if
the authenticated user is not verified. Defaults to `False`.
:param superuser: If `True`, throw `403 Forbidden` if
the authenticated user is not a superuser. Defaults to `False`.
:param get_enabled_backends: Optional dependency callable returning
a list of enabled authentication backends.
Useful if you want to dynamically enable some authentication backends
based on external logic, like a configuration in database.
By default, all specified authentication backends are enabled.
Please not however that every backends will appear in the OpenAPI documentation,
as FastAPI resolves it statically.
"""
signature = self._get_dependency_signature(get_enabled_backends)
@with_signature(signature)
async def current_user_token_dependency(*args: Any, **kwargs: Any):
return await self._authenticate(
*args,
optional=optional,
active=active,
verified=verified,
superuser=superuser,
**kwargs,
)
return current_user_token_dependency
def current_user(
self,
optional: bool = False,
active: bool = False,
verified: bool = False,
superuser: bool = False,
get_enabled_backends: Optional[EnabledBackendsDependency[models.UP, models.ID]] = None,
):
"""
Return a dependency callable to retrieve currently authenticated user.
:param optional: If `True`, `None` is returned if there is no authenticated user
or if it doesn't pass the other requirements.
Otherwise, throw `401 Unauthorized`. Defaults to `False`.
Otherwise, an exception is raised. Defaults to `False`.
:param active: If `True`, throw `401 Unauthorized` if
the authenticated user is inactive. Defaults to `False`.
:param verified: If `True`, throw `401 Unauthorized` if
the authenticated user is not verified. Defaults to `False`.
:param superuser: If `True`, throw `403 Forbidden` if
the authenticated user is not a superuser. Defaults to `False`.
:param get_enabled_backends: Optional dependency callable returning
a list of enabled authentication backends.
Useful if you want to dynamically enable some authentication backends
based on external logic, like a configuration in database.
By default, all specified authentication backends are enabled.
Please not however that every backends will appear in the OpenAPI documentation,
as FastAPI resolves it statically.
"""
signature = self._get_dependency_signature(get_enabled_backends)
@with_signature(signature)
async def current_user_dependency(*args: Any, **kwargs: Any):
user, _ = await self._authenticate(
*args,
optional=optional,
active=active,
verified=verified,
superuser=superuser,
**kwargs,
)
return user
return current_user_dependency
async def _authenticate(
self,
*args,
user_manager: BaseUserManager[models.UP, models.ID],
optional: bool = False,
active: bool = False,
verified: bool = False,
superuser: bool = False,
**kwargs,
) -> Tuple[Optional[models.UP], Optional[str]]:
user: Optional[models.UP] = None
token: Optional[str] = None
enabled_backends: Sequence[AuthenticationBackend[models.UP, models.ID]] = kwargs.get(
"enabled_backends", self.backends
)
for backend in self.backends:
if backend in enabled_backends:
token = kwargs[name_to_variable_name(backend.name)]
strategy: Strategy[models.UP, models.ID] = kwargs[
name_to_strategy_variable_name(backend.name)
]
if token is not None:
user = await strategy.read_token(token, user_manager)
if user:
break
status_code = status.HTTP_401_UNAUTHORIZED
if user:
status_code = status.HTTP_403_FORBIDDEN
if active and not user.is_active:
status_code = status.HTTP_401_UNAUTHORIZED
user = None
elif (
verified and not user.is_verified or superuser and not user.is_superuser
):
user = None
if not user and not optional:
raise HTTPException(status_code=status_code)
return user, token
def _get_dependency_signature(
self, get_enabled_backends: Optional[EnabledBackendsDependency] = None
) -> Signature:
"""
Generate a dynamic signature for the current_user dependency.
Here comes some blood magic 🧙‍♂️
Thank to "makefun", we are able to generate callable
with a dynamic number of dependencies at runtime.
This way, each security schemes are detected by the OpenAPI generator.
"""
try:
parameters: List[Parameter] = [
Parameter(
name="user_manager",
kind=Parameter.POSITIONAL_OR_KEYWORD,
default=Depends(self.get_user_manager),
)
]
for backend in self.backends:
parameters += [
Parameter(
name=name_to_variable_name(backend.name),
kind=Parameter.POSITIONAL_OR_KEYWORD,
default=Depends(cast(Callable, backend.transport.scheme)),
),
Parameter(
name=name_to_strategy_variable_name(backend.name),
kind=Parameter.POSITIONAL_OR_KEYWORD,
default=Depends(backend.get_strategy),
),
]
if get_enabled_backends is not None:
parameters += [
Parameter(
name="enabled_backends",
kind=Parameter.POSITIONAL_OR_KEYWORD,
default=Depends(get_enabled_backends),
)
]
return Signature(parameters)
except ValueError:
raise DuplicateBackendNamesError()