mirror of
				https://github.com/fastapi-users/fastapi-users.git
				synced 2025-10-31 09:28:45 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			240 lines
		
	
	
		
			9.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			240 lines
		
	
	
		
			9.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import re
 | |
| from collections.abc import Sequence
 | |
| from inspect import Parameter, Signature
 | |
| from typing import Any, Callable, Generic, Optional, cast
 | |
| 
 | |
| from fastapi import Depends, HTTPException, status
 | |
| from makefun import with_signature
 | |
| 
 | |
| from fastapi_users import models
 | |
| from fastapi_users.authentication.backend import AuthenticationBackend
 | |
| from fastapi_users.authentication.strategy import Strategy
 | |
| from fastapi_users.manager import BaseUserManager, UserManagerDependency
 | |
| from fastapi_users.types import DependencyCallable
 | |
| 
 | |
| INVALID_CHARS_PATTERN = re.compile(r"[^0-9a-zA-Z_]")
 | |
| INVALID_LEADING_CHARS_PATTERN = re.compile(r"^[^a-zA-Z_]+")
 | |
| 
 | |
| 
 | |
| def name_to_variable_name(name: str) -> str:
 | |
|     """Transform a backend name string into a string safe to use as variable name."""
 | |
|     name = re.sub(INVALID_CHARS_PATTERN, "", name)
 | |
|     name = re.sub(INVALID_LEADING_CHARS_PATTERN, "", name)
 | |
|     return name
 | |
| 
 | |
| 
 | |
| def name_to_strategy_variable_name(name: str) -> str:
 | |
|     """Transform a backend name string into a strategy variable name."""
 | |
|     return f"strategy_{name_to_variable_name(name)}"
 | |
| 
 | |
| 
 | |
| class DuplicateBackendNamesError(Exception):
 | |
|     pass
 | |
| 
 | |
| 
 | |
| EnabledBackendsDependency = DependencyCallable[
 | |
|     Sequence[AuthenticationBackend[models.UP, models.ID]]
 | |
| ]
 | |
| 
 | |
| 
 | |
| class Authenticator(Generic[models.UP, models.ID]):
 | |
|     """
 | |
|     Provides dependency callables to retrieve authenticated user.
 | |
| 
 | |
|     It performs the authentication against a list of backends
 | |
|     defined by the end-developer. The first backend yielding a user wins.
 | |
|     If no backend yields a user, an HTTPException is raised.
 | |
| 
 | |
|     :param backends: List of authentication backends.
 | |
|     :param get_user_manager: User manager dependency callable.
 | |
|     """
 | |
| 
 | |
|     backends: Sequence[AuthenticationBackend[models.UP, models.ID]]
 | |
| 
 | |
|     def __init__(
 | |
|         self,
 | |
|         backends: Sequence[AuthenticationBackend[models.UP, models.ID]],
 | |
|         get_user_manager: UserManagerDependency[models.UP, models.ID],
 | |
|     ):
 | |
|         self.backends = backends
 | |
|         self.get_user_manager = get_user_manager
 | |
| 
 | |
|     def current_user_token(
 | |
|         self,
 | |
|         optional: bool = False,
 | |
|         active: bool = False,
 | |
|         verified: bool = False,
 | |
|         superuser: bool = False,
 | |
|         get_enabled_backends: Optional[
 | |
|             EnabledBackendsDependency[models.UP, models.ID]
 | |
|         ] = None,
 | |
|     ):
 | |
|         """
 | |
|         Return a dependency callable to retrieve currently authenticated user and token.
 | |
| 
 | |
|         :param optional: If `True`, `None` is returned if there is no authenticated user
 | |
|         or if it doesn't pass the other requirements.
 | |
|         Otherwise, throw `401 Unauthorized`. Defaults to `False`.
 | |
|         Otherwise, an exception is raised. Defaults to `False`.
 | |
|         :param active: If `True`, throw `401 Unauthorized` if
 | |
|         the authenticated user is inactive. Defaults to `False`.
 | |
|         :param verified: If `True`, throw `403 Forbidden` if
 | |
|         the authenticated user is not verified. Defaults to `False`.
 | |
|         :param superuser: If `True`, throw `403 Forbidden` if
 | |
|         the authenticated user is not a superuser. Defaults to `False`.
 | |
|         :param get_enabled_backends: Optional dependency callable returning
 | |
|         a list of enabled authentication backends.
 | |
|         Useful if you want to dynamically enable some authentication backends
 | |
|         based on external logic, like a configuration in database.
 | |
|         By default, all specified authentication backends are enabled.
 | |
|         Please not however that every backends will appear in the OpenAPI documentation,
 | |
|         as FastAPI resolves it statically.
 | |
|         """
 | |
|         signature = self._get_dependency_signature(get_enabled_backends)
 | |
| 
 | |
|         @with_signature(signature)
 | |
|         async def current_user_token_dependency(*args: Any, **kwargs: Any):
 | |
|             return await self._authenticate(
 | |
|                 *args,
 | |
|                 optional=optional,
 | |
|                 active=active,
 | |
|                 verified=verified,
 | |
|                 superuser=superuser,
 | |
|                 **kwargs,
 | |
|             )
 | |
| 
 | |
|         return current_user_token_dependency
 | |
| 
 | |
|     def current_user(
 | |
|         self,
 | |
|         optional: bool = False,
 | |
|         active: bool = False,
 | |
|         verified: bool = False,
 | |
|         superuser: bool = False,
 | |
|         get_enabled_backends: Optional[
 | |
|             EnabledBackendsDependency[models.UP, models.ID]
 | |
|         ] = None,
 | |
|     ):
 | |
|         """
 | |
|         Return a dependency callable to retrieve currently authenticated user.
 | |
| 
 | |
|         :param optional: If `True`, `None` is returned if there is no authenticated user
 | |
|         or if it doesn't pass the other requirements.
 | |
|         Otherwise, throw `401 Unauthorized`. Defaults to `False`.
 | |
|         Otherwise, an exception is raised. Defaults to `False`.
 | |
|         :param active: If `True`, throw `401 Unauthorized` if
 | |
|         the authenticated user is inactive. Defaults to `False`.
 | |
|         :param verified: If `True`, throw `403 Forbidden` if
 | |
|         the authenticated user is not verified. Defaults to `False`.
 | |
|         :param superuser: If `True`, throw `403 Forbidden` if
 | |
|         the authenticated user is not a superuser. Defaults to `False`.
 | |
|         :param get_enabled_backends: Optional dependency callable returning
 | |
|         a list of enabled authentication backends.
 | |
|         Useful if you want to dynamically enable some authentication backends
 | |
|         based on external logic, like a configuration in database.
 | |
|         By default, all specified authentication backends are enabled.
 | |
|         Please not however that every backends will appear in the OpenAPI documentation,
 | |
|         as FastAPI resolves it statically.
 | |
|         """
 | |
|         signature = self._get_dependency_signature(get_enabled_backends)
 | |
| 
 | |
|         @with_signature(signature)
 | |
|         async def current_user_dependency(*args: Any, **kwargs: Any):
 | |
|             user, _ = await self._authenticate(
 | |
|                 *args,
 | |
|                 optional=optional,
 | |
|                 active=active,
 | |
|                 verified=verified,
 | |
|                 superuser=superuser,
 | |
|                 **kwargs,
 | |
|             )
 | |
|             return user
 | |
| 
 | |
|         return current_user_dependency
 | |
| 
 | |
|     async def _authenticate(
 | |
|         self,
 | |
|         *args,
 | |
|         user_manager: BaseUserManager[models.UP, models.ID],
 | |
|         optional: bool = False,
 | |
|         active: bool = False,
 | |
|         verified: bool = False,
 | |
|         superuser: bool = False,
 | |
|         **kwargs,
 | |
|     ) -> tuple[Optional[models.UP], Optional[str]]:
 | |
|         user: Optional[models.UP] = None
 | |
|         token: Optional[str] = None
 | |
|         enabled_backends: Sequence[AuthenticationBackend[models.UP, models.ID]] = (
 | |
|             kwargs.get("enabled_backends", self.backends)
 | |
|         )
 | |
|         for backend in self.backends:
 | |
|             if backend in enabled_backends:
 | |
|                 token = kwargs[name_to_variable_name(backend.name)]
 | |
|                 strategy: Strategy[models.UP, models.ID] = kwargs[
 | |
|                     name_to_strategy_variable_name(backend.name)
 | |
|                 ]
 | |
|                 if token is not None:
 | |
|                     user = await strategy.read_token(token, user_manager)
 | |
|                     if user:
 | |
|                         break
 | |
| 
 | |
|         status_code = status.HTTP_401_UNAUTHORIZED
 | |
|         if user:
 | |
|             status_code = status.HTTP_403_FORBIDDEN
 | |
|             if active and not user.is_active:
 | |
|                 status_code = status.HTTP_401_UNAUTHORIZED
 | |
|                 user = None
 | |
|             elif (
 | |
|                 verified and not user.is_verified or superuser and not user.is_superuser
 | |
|             ):
 | |
|                 user = None
 | |
|         if not user and not optional:
 | |
|             raise HTTPException(status_code=status_code)
 | |
|         return user, token
 | |
| 
 | |
|     def _get_dependency_signature(
 | |
|         self, get_enabled_backends: Optional[EnabledBackendsDependency] = None
 | |
|     ) -> Signature:
 | |
|         """
 | |
|         Generate a dynamic signature for the current_user dependency.
 | |
| 
 | |
|         Here comes some blood magic 🧙♂️
 | |
|         Thank to "makefun", we are able to generate callable
 | |
|         with a dynamic number of dependencies at runtime.
 | |
|         This way, each security schemes are detected by the OpenAPI generator.
 | |
|         """
 | |
|         try:
 | |
|             parameters: list[Parameter] = [
 | |
|                 Parameter(
 | |
|                     name="user_manager",
 | |
|                     kind=Parameter.POSITIONAL_OR_KEYWORD,
 | |
|                     default=Depends(self.get_user_manager),
 | |
|                 )
 | |
|             ]
 | |
| 
 | |
|             for backend in self.backends:
 | |
|                 parameters += [
 | |
|                     Parameter(
 | |
|                         name=name_to_variable_name(backend.name),
 | |
|                         kind=Parameter.POSITIONAL_OR_KEYWORD,
 | |
|                         default=Depends(cast(Callable, backend.transport.scheme)),
 | |
|                     ),
 | |
|                     Parameter(
 | |
|                         name=name_to_strategy_variable_name(backend.name),
 | |
|                         kind=Parameter.POSITIONAL_OR_KEYWORD,
 | |
|                         default=Depends(backend.get_strategy),
 | |
|                     ),
 | |
|                 ]
 | |
| 
 | |
|             if get_enabled_backends is not None:
 | |
|                 parameters += [
 | |
|                     Parameter(
 | |
|                         name="enabled_backends",
 | |
|                         kind=Parameter.POSITIONAL_OR_KEYWORD,
 | |
|                         default=Depends(get_enabled_backends),
 | |
|                     )
 | |
|                 ]
 | |
|             return Signature(parameters)
 | |
|         except ValueError:
 | |
|             raise DuplicateBackendNamesError()
 | 
