mirror of
				https://github.com/fastapi-users/fastapi-users.git
				synced 2025-11-04 06:37:51 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			140 lines
		
	
	
		
			4.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			140 lines
		
	
	
		
			4.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import re
 | 
						|
from inspect import Parameter, Signature
 | 
						|
from typing import Optional, Sequence
 | 
						|
 | 
						|
from fastapi import Depends, HTTPException, status
 | 
						|
from makefun import with_signature
 | 
						|
 | 
						|
from fastapi_users import models
 | 
						|
from fastapi_users.authentication.base import BaseAuthentication  # noqa: F401
 | 
						|
from fastapi_users.authentication.cookie import CookieAuthentication  # noqa: F401
 | 
						|
from fastapi_users.authentication.jwt import JWTAuthentication  # noqa: F401
 | 
						|
from fastapi_users.manager import BaseUserManager, UserManagerDependency
 | 
						|
 | 
						|
INVALID_CHARS_PATTERN = re.compile(r"[^0-9a-zA-Z_]")
 | 
						|
INVALID_LEADING_CHARS_PATTERN = re.compile(r"^[^a-zA-Z_]+")
 | 
						|
 | 
						|
 | 
						|
def name_to_variable_name(name: str) -> str:
 | 
						|
    """Transform a backend name string into a string safe to use as variable name."""
 | 
						|
    name = re.sub(INVALID_CHARS_PATTERN, "", name)
 | 
						|
    name = re.sub(INVALID_LEADING_CHARS_PATTERN, "", name)
 | 
						|
    return name
 | 
						|
 | 
						|
 | 
						|
class DuplicateBackendNamesError(Exception):
 | 
						|
    pass
 | 
						|
 | 
						|
 | 
						|
class Authenticator:
 | 
						|
    """
 | 
						|
    Provides dependency callables to retrieve authenticated user.
 | 
						|
 | 
						|
    It performs the authentication against a list of backends
 | 
						|
    defined by the end-developer. The first backend yielding a user wins.
 | 
						|
    If no backend yields a user, an HTTPException is raised.
 | 
						|
 | 
						|
    :param backends: List of authentication backends.
 | 
						|
    :param get_user_manager: User manager dependency callable.
 | 
						|
    """
 | 
						|
 | 
						|
    backends: Sequence[BaseAuthentication]
 | 
						|
 | 
						|
    def __init__(
 | 
						|
        self,
 | 
						|
        backends: Sequence[BaseAuthentication],
 | 
						|
        get_user_manager: UserManagerDependency[models.UC, models.UD],
 | 
						|
    ):
 | 
						|
        self.backends = backends
 | 
						|
        self.get_user_manager = get_user_manager
 | 
						|
 | 
						|
    def current_user(
 | 
						|
        self,
 | 
						|
        optional: bool = False,
 | 
						|
        active: bool = False,
 | 
						|
        verified: bool = False,
 | 
						|
        superuser: bool = False,
 | 
						|
    ):
 | 
						|
        """
 | 
						|
        Return a dependency callable to retrieve currently authenticated user.
 | 
						|
 | 
						|
        :param optional: If `True`, `None` is returned if there is no authenticated user
 | 
						|
        or if it doesn't pass the other requirements.
 | 
						|
        Otherwise, throw `401 Unauthorized`. Defaults to `False`.
 | 
						|
        Otherwise, an exception is raised. Defaults to `False`.
 | 
						|
        :param active: If `True`, throw `401 Unauthorized` if
 | 
						|
        the authenticated user is inactive. Defaults to `False`.
 | 
						|
        :param verified: If `True`, throw `401 Unauthorized` if
 | 
						|
        the authenticated user is not verified. Defaults to `False`.
 | 
						|
        :param superuser: If `True`, throw `403 Forbidden` if
 | 
						|
        the authenticated user is not a superuser. Defaults to `False`.
 | 
						|
        """
 | 
						|
        # Here comes some blood magic 🧙♂️
 | 
						|
        # Thank to "makefun", we are able to generate callable
 | 
						|
        # with a dynamic number of dependencies at runtime.
 | 
						|
        # This way, each security schemes are detected by the OpenAPI generator.
 | 
						|
        try:
 | 
						|
            parameters = [
 | 
						|
                Parameter(
 | 
						|
                    name=name_to_variable_name(backend.name),
 | 
						|
                    kind=Parameter.POSITIONAL_OR_KEYWORD,
 | 
						|
                    default=Depends(backend.scheme),  # type: ignore
 | 
						|
                )
 | 
						|
                for backend in self.backends
 | 
						|
            ] + [
 | 
						|
                Parameter(
 | 
						|
                    name="user_manager",
 | 
						|
                    kind=Parameter.POSITIONAL_OR_KEYWORD,
 | 
						|
                    default=Depends(self.get_user_manager),
 | 
						|
                )
 | 
						|
            ]
 | 
						|
            signature = Signature(parameters)
 | 
						|
        except ValueError:
 | 
						|
            raise DuplicateBackendNamesError()
 | 
						|
 | 
						|
        @with_signature(signature)
 | 
						|
        async def current_user_dependency(*args, **kwargs):
 | 
						|
            return await self._authenticate(
 | 
						|
                *args,
 | 
						|
                optional=optional,
 | 
						|
                active=active,
 | 
						|
                verified=verified,
 | 
						|
                superuser=superuser,
 | 
						|
                **kwargs
 | 
						|
            )
 | 
						|
 | 
						|
        return current_user_dependency
 | 
						|
 | 
						|
    async def _authenticate(
 | 
						|
        self,
 | 
						|
        *args,
 | 
						|
        user_manager: BaseUserManager[models.UC, models.UD],
 | 
						|
        optional: bool = False,
 | 
						|
        active: bool = False,
 | 
						|
        verified: bool = False,
 | 
						|
        superuser: bool = False,
 | 
						|
        **kwargs
 | 
						|
    ) -> Optional[models.UD]:
 | 
						|
        user: Optional[models.UD] = None
 | 
						|
        for backend in self.backends:
 | 
						|
            token: str = kwargs[name_to_variable_name(backend.name)]
 | 
						|
            if token:
 | 
						|
                user = await backend(token, user_manager)
 | 
						|
                if user:
 | 
						|
                    break
 | 
						|
 | 
						|
        status_code = status.HTTP_401_UNAUTHORIZED
 | 
						|
        if user:
 | 
						|
            status_code = status.HTTP_403_FORBIDDEN
 | 
						|
            if active and not user.is_active:
 | 
						|
                status_code = status.HTTP_401_UNAUTHORIZED
 | 
						|
                user = None
 | 
						|
            elif verified and not user.is_verified:
 | 
						|
                user = None
 | 
						|
            elif superuser and not user.is_superuser:
 | 
						|
                user = None
 | 
						|
 | 
						|
        if not user and not optional:
 | 
						|
            raise HTTPException(status_code=status_code)
 | 
						|
        return user
 |