mirror of
				https://github.com/fastapi-users/fastapi-users.git
				synced 2025-11-04 22:56:56 +08:00 
			
		
		
		
	* Implement Transport classes * Implement authentication strategy classes * Revamp authentication with Transport and Strategy * Revamp strategy and OAuth so that they can use a callable dependency * Update docstring * Make ErrorCode a proper Enum and cleanup unused OpenAPI utils * Remove useless check * Tweak typing in authenticator * Update docs * Improve logout/destroy token logic * Update docs * Update docs * Update docs and full examples * Apply formatting to examples * Update OAuth doc and examples * Add migration doc * Implement Redis session token * Add Redis Session documentation * RedisSession -> Redis * Fix links in docs
		
			
				
	
	
		
			232 lines
		
	
	
		
			8.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			232 lines
		
	
	
		
			8.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import re
 | 
						|
from inspect import Parameter, Signature
 | 
						|
from typing import Callable, List, Optional, Sequence, Tuple, cast
 | 
						|
 | 
						|
from fastapi import Depends, HTTPException, status
 | 
						|
from makefun import with_signature
 | 
						|
 | 
						|
from fastapi_users import models
 | 
						|
from fastapi_users.authentication.backend import AuthenticationBackend
 | 
						|
from fastapi_users.authentication.strategy import Strategy
 | 
						|
from fastapi_users.manager import BaseUserManager, UserManagerDependency
 | 
						|
 | 
						|
INVALID_CHARS_PATTERN = re.compile(r"[^0-9a-zA-Z_]")
 | 
						|
INVALID_LEADING_CHARS_PATTERN = re.compile(r"^[^a-zA-Z_]+")
 | 
						|
 | 
						|
 | 
						|
def name_to_variable_name(name: str) -> str:
 | 
						|
    """Transform a backend name string into a string safe to use as variable name."""
 | 
						|
    name = re.sub(INVALID_CHARS_PATTERN, "", name)
 | 
						|
    name = re.sub(INVALID_LEADING_CHARS_PATTERN, "", name)
 | 
						|
    return name
 | 
						|
 | 
						|
 | 
						|
def name_to_strategy_variable_name(name: str) -> str:
 | 
						|
    """Transform a backend name string into a strategy variable name."""
 | 
						|
    return f"strategy_{name_to_variable_name(name)}"
 | 
						|
 | 
						|
 | 
						|
class DuplicateBackendNamesError(Exception):
 | 
						|
    pass
 | 
						|
 | 
						|
 | 
						|
EnabledBackendsDependency = Callable[..., Sequence[AuthenticationBackend]]
 | 
						|
 | 
						|
 | 
						|
class Authenticator:
 | 
						|
    """
 | 
						|
    Provides dependency callables to retrieve authenticated user.
 | 
						|
 | 
						|
    It performs the authentication against a list of backends
 | 
						|
    defined by the end-developer. The first backend yielding a user wins.
 | 
						|
    If no backend yields a user, an HTTPException is raised.
 | 
						|
 | 
						|
    :param backends: List of authentication backends.
 | 
						|
    :param get_user_manager: User manager dependency callable.
 | 
						|
    """
 | 
						|
 | 
						|
    backends: Sequence[AuthenticationBackend]
 | 
						|
 | 
						|
    def __init__(
 | 
						|
        self,
 | 
						|
        backends: Sequence[AuthenticationBackend],
 | 
						|
        get_user_manager: UserManagerDependency[models.UC, models.UD],
 | 
						|
    ):
 | 
						|
        self.backends = backends
 | 
						|
        self.get_user_manager = get_user_manager
 | 
						|
 | 
						|
    def current_user_token(
 | 
						|
        self,
 | 
						|
        optional: bool = False,
 | 
						|
        active: bool = False,
 | 
						|
        verified: bool = False,
 | 
						|
        superuser: bool = False,
 | 
						|
        get_enabled_backends: Optional[EnabledBackendsDependency] = None,
 | 
						|
    ):
 | 
						|
        """
 | 
						|
        Return a dependency callable to retrieve currently authenticated user and token.
 | 
						|
 | 
						|
        :param optional: If `True`, `None` is returned if there is no authenticated user
 | 
						|
        or if it doesn't pass the other requirements.
 | 
						|
        Otherwise, throw `401 Unauthorized`. Defaults to `False`.
 | 
						|
        Otherwise, an exception is raised. Defaults to `False`.
 | 
						|
        :param active: If `True`, throw `401 Unauthorized` if
 | 
						|
        the authenticated user is inactive. Defaults to `False`.
 | 
						|
        :param verified: If `True`, throw `401 Unauthorized` if
 | 
						|
        the authenticated user is not verified. Defaults to `False`.
 | 
						|
        :param superuser: If `True`, throw `403 Forbidden` if
 | 
						|
        the authenticated user is not a superuser. Defaults to `False`.
 | 
						|
        :param get_enabled_backends: Optional dependency callable returning
 | 
						|
        a list of enabled authentication backends.
 | 
						|
        Useful if you want to dynamically enable some authentication backends
 | 
						|
        based on external logic, like a configuration in database.
 | 
						|
        By default, all specified authentication backends are enabled.
 | 
						|
        Please not however that every backends will appear in the OpenAPI documentation,
 | 
						|
        as FastAPI resolves it statically.
 | 
						|
        """
 | 
						|
        signature = self._get_dependency_signature(get_enabled_backends)
 | 
						|
 | 
						|
        @with_signature(signature)
 | 
						|
        async def current_user_token_dependency(*args, **kwargs):
 | 
						|
            return await self._authenticate(
 | 
						|
                *args,
 | 
						|
                optional=optional,
 | 
						|
                active=active,
 | 
						|
                verified=verified,
 | 
						|
                superuser=superuser,
 | 
						|
                **kwargs,
 | 
						|
            )
 | 
						|
 | 
						|
        return current_user_token_dependency
 | 
						|
 | 
						|
    def current_user(
 | 
						|
        self,
 | 
						|
        optional: bool = False,
 | 
						|
        active: bool = False,
 | 
						|
        verified: bool = False,
 | 
						|
        superuser: bool = False,
 | 
						|
        get_enabled_backends: Optional[EnabledBackendsDependency] = None,
 | 
						|
    ):
 | 
						|
        """
 | 
						|
        Return a dependency callable to retrieve currently authenticated user.
 | 
						|
 | 
						|
        :param optional: If `True`, `None` is returned if there is no authenticated user
 | 
						|
        or if it doesn't pass the other requirements.
 | 
						|
        Otherwise, throw `401 Unauthorized`. Defaults to `False`.
 | 
						|
        Otherwise, an exception is raised. Defaults to `False`.
 | 
						|
        :param active: If `True`, throw `401 Unauthorized` if
 | 
						|
        the authenticated user is inactive. Defaults to `False`.
 | 
						|
        :param verified: If `True`, throw `401 Unauthorized` if
 | 
						|
        the authenticated user is not verified. Defaults to `False`.
 | 
						|
        :param superuser: If `True`, throw `403 Forbidden` if
 | 
						|
        the authenticated user is not a superuser. Defaults to `False`.
 | 
						|
        :param get_enabled_backends: Optional dependency callable returning
 | 
						|
        a list of enabled authentication backends.
 | 
						|
        Useful if you want to dynamically enable some authentication backends
 | 
						|
        based on external logic, like a configuration in database.
 | 
						|
        By default, all specified authentication backends are enabled.
 | 
						|
        Please not however that every backends will appear in the OpenAPI documentation,
 | 
						|
        as FastAPI resolves it statically.
 | 
						|
        """
 | 
						|
        signature = self._get_dependency_signature(get_enabled_backends)
 | 
						|
 | 
						|
        @with_signature(signature)
 | 
						|
        async def current_user_dependency(*args, **kwargs):
 | 
						|
            user, _ = await self._authenticate(
 | 
						|
                *args,
 | 
						|
                optional=optional,
 | 
						|
                active=active,
 | 
						|
                verified=verified,
 | 
						|
                superuser=superuser,
 | 
						|
                **kwargs,
 | 
						|
            )
 | 
						|
            return user
 | 
						|
 | 
						|
        return current_user_dependency
 | 
						|
 | 
						|
    async def _authenticate(
 | 
						|
        self,
 | 
						|
        *args,
 | 
						|
        user_manager: BaseUserManager[models.UC, models.UD],
 | 
						|
        optional: bool = False,
 | 
						|
        active: bool = False,
 | 
						|
        verified: bool = False,
 | 
						|
        superuser: bool = False,
 | 
						|
        **kwargs,
 | 
						|
    ) -> Tuple[Optional[models.UD], Optional[str]]:
 | 
						|
        user: Optional[models.UD] = None
 | 
						|
        token: Optional[str] = None
 | 
						|
        enabled_backends: Sequence[AuthenticationBackend] = kwargs.get(
 | 
						|
            "enabled_backends", self.backends
 | 
						|
        )
 | 
						|
        for backend in self.backends:
 | 
						|
            if backend in enabled_backends:
 | 
						|
                token = kwargs[name_to_variable_name(backend.name)]
 | 
						|
                strategy: Strategy[models.UC, models.UD] = kwargs[
 | 
						|
                    name_to_strategy_variable_name(backend.name)
 | 
						|
                ]
 | 
						|
                if token is not None:
 | 
						|
                    user = await strategy.read_token(token, user_manager)
 | 
						|
                    if user:
 | 
						|
                        break
 | 
						|
 | 
						|
        status_code = status.HTTP_401_UNAUTHORIZED
 | 
						|
        if user:
 | 
						|
            status_code = status.HTTP_403_FORBIDDEN
 | 
						|
            if active and not user.is_active:
 | 
						|
                status_code = status.HTTP_401_UNAUTHORIZED
 | 
						|
                user = None
 | 
						|
            elif (
 | 
						|
                verified and not user.is_verified or superuser and not user.is_superuser
 | 
						|
            ):
 | 
						|
                user = None
 | 
						|
        if not user and not optional:
 | 
						|
            raise HTTPException(status_code=status_code)
 | 
						|
        return user, token
 | 
						|
 | 
						|
    def _get_dependency_signature(
 | 
						|
        self, get_enabled_backends: Optional[EnabledBackendsDependency] = None
 | 
						|
    ) -> Signature:
 | 
						|
        """
 | 
						|
        Generate a dynamic signature for the current_user dependency.
 | 
						|
 | 
						|
        Here comes some blood magic 🧙♂️
 | 
						|
        Thank to "makefun", we are able to generate callable
 | 
						|
        with a dynamic number of dependencies at runtime.
 | 
						|
        This way, each security schemes are detected by the OpenAPI generator.
 | 
						|
        """
 | 
						|
        try:
 | 
						|
            parameters: List[Parameter] = [
 | 
						|
                Parameter(
 | 
						|
                    name="user_manager",
 | 
						|
                    kind=Parameter.POSITIONAL_OR_KEYWORD,
 | 
						|
                    default=Depends(self.get_user_manager),
 | 
						|
                )
 | 
						|
            ]
 | 
						|
 | 
						|
            for backend in self.backends:
 | 
						|
                parameters += [
 | 
						|
                    Parameter(
 | 
						|
                        name=name_to_variable_name(backend.name),
 | 
						|
                        kind=Parameter.POSITIONAL_OR_KEYWORD,
 | 
						|
                        default=Depends(cast(Callable, backend.transport.scheme)),
 | 
						|
                    ),
 | 
						|
                    Parameter(
 | 
						|
                        name=name_to_strategy_variable_name(backend.name),
 | 
						|
                        kind=Parameter.POSITIONAL_OR_KEYWORD,
 | 
						|
                        default=Depends(backend.get_strategy),
 | 
						|
                    ),
 | 
						|
                ]
 | 
						|
 | 
						|
            if get_enabled_backends is not None:
 | 
						|
                parameters += [
 | 
						|
                    Parameter(
 | 
						|
                        name="enabled_backends",
 | 
						|
                        kind=Parameter.POSITIONAL_OR_KEYWORD,
 | 
						|
                        default=Depends(get_enabled_backends),
 | 
						|
                    )
 | 
						|
                ]
 | 
						|
            return Signature(parameters)
 | 
						|
        except ValueError:
 | 
						|
            raise DuplicateBackendNamesError()
 |