import re from inspect import Parameter, Signature from typing import Optional, Sequence from fastapi import Depends, HTTPException, status from makefun import with_signature from fastapi_users import models from fastapi_users.authentication.base import BaseAuthentication # noqa: F401 from fastapi_users.authentication.cookie import CookieAuthentication # noqa: F401 from fastapi_users.authentication.jwt import JWTAuthentication # noqa: F401 from fastapi_users.manager import BaseUserManager, UserManagerDependency INVALID_CHARS_PATTERN = re.compile(r"[^0-9a-zA-Z_]") INVALID_LEADING_CHARS_PATTERN = re.compile(r"^[^a-zA-Z_]+") def name_to_variable_name(name: str) -> str: """Transform a backend name string into a string safe to use as variable name.""" name = re.sub(INVALID_CHARS_PATTERN, "", name) name = re.sub(INVALID_LEADING_CHARS_PATTERN, "", name) return name class DuplicateBackendNamesError(Exception): pass class Authenticator: """ Provides dependency callables to retrieve authenticated user. It performs the authentication against a list of backends defined by the end-developer. The first backend yielding a user wins. If no backend yields a user, an HTTPException is raised. :param backends: List of authentication backends. :param get_user_manager: User manager dependency callable. """ backends: Sequence[BaseAuthentication] def __init__( self, backends: Sequence[BaseAuthentication], get_user_manager: UserManagerDependency[models.UC, models.UD], ): self.backends = backends self.get_user_manager = get_user_manager def current_user( self, optional: bool = False, active: bool = False, verified: bool = False, superuser: bool = False, ): """ Return a dependency callable to retrieve currently authenticated user. :param optional: If `True`, `None` is returned if there is no authenticated user or if it doesn't pass the other requirements. Otherwise, throw `401 Unauthorized`. Defaults to `False`. Otherwise, an exception is raised. Defaults to `False`. :param active: If `True`, throw `401 Unauthorized` if the authenticated user is inactive. Defaults to `False`. :param verified: If `True`, throw `401 Unauthorized` if the authenticated user is not verified. Defaults to `False`. :param superuser: If `True`, throw `403 Forbidden` if the authenticated user is not a superuser. Defaults to `False`. """ # Here comes some blood magic 🧙‍♂️ # Thank to "makefun", we are able to generate callable # with a dynamic number of dependencies at runtime. # This way, each security schemes are detected by the OpenAPI generator. try: parameters = [ Parameter( name=name_to_variable_name(backend.name), kind=Parameter.POSITIONAL_OR_KEYWORD, default=Depends(backend.scheme), # type: ignore ) for backend in self.backends ] + [ Parameter( name="user_manager", kind=Parameter.POSITIONAL_OR_KEYWORD, default=Depends(self.get_user_manager), ) ] signature = Signature(parameters) except ValueError: raise DuplicateBackendNamesError() @with_signature(signature) async def current_user_dependency(*args, **kwargs): return await self._authenticate( *args, optional=optional, active=active, verified=verified, superuser=superuser, **kwargs ) return current_user_dependency async def _authenticate( self, *args, user_manager: BaseUserManager[models.UC, models.UD], optional: bool = False, active: bool = False, verified: bool = False, superuser: bool = False, **kwargs ) -> Optional[models.UD]: user: Optional[models.UD] = None for backend in self.backends: token: str = kwargs[name_to_variable_name(backend.name)] if token: user = await backend(token, user_manager) if user: break status_code = status.HTTP_401_UNAUTHORIZED if user: status_code = status.HTTP_403_FORBIDDEN if active and not user.is_active: status_code = status.HTTP_401_UNAUTHORIZED user = None elif verified and not user.is_verified: user = None elif superuser and not user.is_superuser: user = None if not user and not optional: raise HTTPException(status_code=status_code) return user