mirror of
https://github.com/fastapi-users/fastapi-users.git
synced 2025-08-14 18:58:10 +08:00
700 lines
24 KiB
Python
700 lines
24 KiB
Python
import uuid
|
|
from typing import Any, Dict, Generic, Optional, Union
|
|
|
|
import jwt
|
|
from fastapi import Request
|
|
from fastapi.security import OAuth2PasswordRequestForm
|
|
|
|
from fastapi_users import exceptions, models, schemas
|
|
from fastapi_users.db import BaseUserDatabase
|
|
from fastapi_users.jwt import SecretType, decode_jwt, generate_jwt
|
|
from fastapi_users.password import PasswordHelper, PasswordHelperProtocol
|
|
from fastapi_users.types import DependencyCallable
|
|
|
|
RESET_PASSWORD_TOKEN_AUDIENCE = "fastapi-users:reset"
|
|
VERIFY_USER_TOKEN_AUDIENCE = "fastapi-users:verify"
|
|
|
|
|
|
class BaseUserManager(Generic[models.UP, models.ID]):
|
|
"""
|
|
User management logic.
|
|
|
|
:attribute reset_password_token_secret: Secret to encode reset password token.
|
|
:attribute reset_password_token_lifetime_seconds: Lifetime of reset password token.
|
|
:attribute reset_password_token_audience: JWT audience of reset password token.
|
|
:attribute verification_token_secret: Secret to encode verification token.
|
|
:attribute verification_token_lifetime_seconds: Lifetime of verification token.
|
|
:attribute verification_token_audience: JWT audience of verification token.
|
|
|
|
:param user_db: Database adapter instance.
|
|
"""
|
|
|
|
reset_password_token_secret: SecretType
|
|
reset_password_token_lifetime_seconds: int = 3600
|
|
reset_password_token_audience: str = RESET_PASSWORD_TOKEN_AUDIENCE
|
|
|
|
verification_token_secret: SecretType
|
|
verification_token_lifetime_seconds: int = 3600
|
|
verification_token_audience: str = VERIFY_USER_TOKEN_AUDIENCE
|
|
|
|
user_db: BaseUserDatabase[models.UP, models.ID]
|
|
password_helper: PasswordHelperProtocol
|
|
|
|
def __init__(
|
|
self,
|
|
user_db: BaseUserDatabase[models.UP, models.ID],
|
|
password_helper: Optional[PasswordHelperProtocol] = None,
|
|
):
|
|
self.user_db = user_db
|
|
if password_helper is None:
|
|
self.password_helper = PasswordHelper()
|
|
else:
|
|
self.password_helper = password_helper # pragma: no cover
|
|
|
|
def parse_id(self, value: Any) -> models.ID:
|
|
"""
|
|
Parse a value into a correct models.ID instance.
|
|
|
|
:param value: The value to parse.
|
|
:raises InvalidID: The models.ID value is invalid.
|
|
:return: An models.ID object.
|
|
"""
|
|
raise NotImplementedError() # pragma: no cover
|
|
|
|
async def get(self, id: models.ID) -> models.UP:
|
|
"""
|
|
Get a user by id.
|
|
|
|
:param id: Id. of the user to retrieve.
|
|
:raises UserNotExists: The user does not exist.
|
|
:return: A user.
|
|
"""
|
|
user = await self.user_db.get(id)
|
|
|
|
if user is None:
|
|
raise exceptions.UserNotExists()
|
|
|
|
return user
|
|
|
|
async def get_by_email(self, user_email: str) -> models.UP:
|
|
"""
|
|
Get a user by e-mail.
|
|
|
|
:param user_email: E-mail of the user to retrieve.
|
|
:raises UserNotExists: The user does not exist.
|
|
:return: A user.
|
|
"""
|
|
user = await self.user_db.get_by_email(user_email)
|
|
|
|
if user is None:
|
|
raise exceptions.UserNotExists()
|
|
|
|
return user
|
|
|
|
async def get_by_oauth_account(self, oauth: str, account_id: str) -> models.UP:
|
|
"""
|
|
Get a user by OAuth account.
|
|
|
|
:param oauth: Name of the OAuth client.
|
|
:param account_id: Id. of the account on the external OAuth service.
|
|
:raises UserNotExists: The user does not exist.
|
|
:return: A user.
|
|
"""
|
|
user = await self.user_db.get_by_oauth_account(oauth, account_id)
|
|
|
|
if user is None:
|
|
raise exceptions.UserNotExists()
|
|
|
|
return user
|
|
|
|
async def create(
|
|
self,
|
|
user_create: schemas.UC,
|
|
safe: bool = False,
|
|
request: Optional[Request] = None,
|
|
) -> models.UP:
|
|
"""
|
|
Create a user in database.
|
|
|
|
Triggers the on_after_register handler on success.
|
|
|
|
:param user_create: The UserCreate model to create.
|
|
:param safe: If True, sensitive values like is_superuser or is_verified
|
|
will be ignored during the creation, defaults to False.
|
|
:param request: Optional FastAPI request that
|
|
triggered the operation, defaults to None.
|
|
:raises UserAlreadyExists: A user already exists with the same e-mail.
|
|
:return: A new user.
|
|
"""
|
|
await self.validate_password(user_create.password, user_create)
|
|
|
|
existing_user = await self.user_db.get_by_email(user_create.email)
|
|
if existing_user is not None:
|
|
raise exceptions.UserAlreadyExists()
|
|
|
|
user_dict = (
|
|
user_create.create_update_dict()
|
|
if safe
|
|
else user_create.create_update_dict_superuser()
|
|
)
|
|
password = user_dict.pop("password")
|
|
user_dict["hashed_password"] = self.password_helper.hash(password)
|
|
|
|
created_user = await self.user_db.create(user_dict)
|
|
|
|
await self.on_after_register(created_user, request)
|
|
|
|
return created_user
|
|
|
|
async def oauth_callback(
|
|
self: "BaseUserManager[models.UOAP, models.ID]",
|
|
oauth_name: str,
|
|
access_token: str,
|
|
account_id: str,
|
|
account_email: str,
|
|
expires_at: Optional[int] = None,
|
|
refresh_token: Optional[str] = None,
|
|
request: Optional[Request] = None,
|
|
*,
|
|
associate_by_email: bool = False,
|
|
is_verified_by_default: bool = False,
|
|
) -> models.UOAP:
|
|
"""
|
|
Handle the callback after a successful OAuth authentication.
|
|
|
|
If the user already exists with this OAuth account, the token is updated.
|
|
|
|
If a user with the same e-mail already exists and `associate_by_email` is True,
|
|
the OAuth account is associated to this user.
|
|
Otherwise, the `UserNotExists` exception is raised.
|
|
|
|
If the user does not exist, it is created and the on_after_register handler
|
|
is triggered.
|
|
|
|
:param oauth_name: Name of the OAuth client.
|
|
:param access_token: Valid access token for the service provider.
|
|
:param account_id: models.ID of the user on the service provider.
|
|
:param account_email: E-mail of the user on the service provider.
|
|
:param expires_at: Optional timestamp at which the access token expires.
|
|
:param refresh_token: Optional refresh token to get a
|
|
fresh access token from the service provider.
|
|
:param request: Optional FastAPI request that
|
|
triggered the operation, defaults to None
|
|
:param associate_by_email: If True, any existing user with the same
|
|
e-mail address will be associated to this user. Defaults to False.
|
|
:param is_verified_by_default: If True, the `is_verified` flag will be
|
|
set to `True` on newly created user. Make sure the OAuth Provider you're
|
|
using does verify the email address before enabling this flag.
|
|
Defaults to False.
|
|
:return: A user.
|
|
"""
|
|
oauth_account_dict = {
|
|
"oauth_name": oauth_name,
|
|
"access_token": access_token,
|
|
"account_id": account_id,
|
|
"account_email": account_email,
|
|
"expires_at": expires_at,
|
|
"refresh_token": refresh_token,
|
|
}
|
|
|
|
try:
|
|
user = await self.get_by_oauth_account(oauth_name, account_id)
|
|
except exceptions.UserNotExists:
|
|
try:
|
|
# Associate account
|
|
user = await self.get_by_email(account_email)
|
|
if not associate_by_email:
|
|
raise exceptions.UserAlreadyExists()
|
|
user = await self.user_db.add_oauth_account(user, oauth_account_dict)
|
|
except exceptions.UserNotExists:
|
|
# Create account
|
|
password = self.password_helper.generate()
|
|
user_dict = {
|
|
"email": account_email,
|
|
"hashed_password": self.password_helper.hash(password),
|
|
"is_verified": is_verified_by_default,
|
|
}
|
|
user = await self.user_db.create(user_dict)
|
|
user = await self.user_db.add_oauth_account(user, oauth_account_dict)
|
|
await self.on_after_register(user, request)
|
|
else:
|
|
# Update oauth
|
|
for existing_oauth_account in user.oauth_accounts:
|
|
if (
|
|
existing_oauth_account.account_id == account_id
|
|
and existing_oauth_account.oauth_name == oauth_name
|
|
):
|
|
user = await self.user_db.update_oauth_account(
|
|
user, existing_oauth_account, oauth_account_dict
|
|
)
|
|
|
|
return user
|
|
|
|
async def oauth_associate_callback(
|
|
self: "BaseUserManager[models.UOAP, models.ID]",
|
|
user: models.UOAP,
|
|
oauth_name: str,
|
|
access_token: str,
|
|
account_id: str,
|
|
account_email: str,
|
|
expires_at: Optional[int] = None,
|
|
refresh_token: Optional[str] = None,
|
|
request: Optional[Request] = None,
|
|
) -> models.UOAP:
|
|
"""
|
|
Handle the callback after a successful OAuth association.
|
|
|
|
We add this new OAuth account to the given user.
|
|
|
|
:param oauth_name: Name of the OAuth client.
|
|
:param access_token: Valid access token for the service provider.
|
|
:param account_id: models.ID of the user on the service provider.
|
|
:param account_email: E-mail of the user on the service provider.
|
|
:param expires_at: Optional timestamp at which the access token expires.
|
|
:param refresh_token: Optional refresh token to get a
|
|
fresh access token from the service provider.
|
|
:param request: Optional FastAPI request that
|
|
triggered the operation, defaults to None
|
|
:return: A user.
|
|
"""
|
|
oauth_account_dict = {
|
|
"oauth_name": oauth_name,
|
|
"access_token": access_token,
|
|
"account_id": account_id,
|
|
"account_email": account_email,
|
|
"expires_at": expires_at,
|
|
"refresh_token": refresh_token,
|
|
}
|
|
|
|
user = await self.user_db.add_oauth_account(user, oauth_account_dict)
|
|
|
|
await self.on_after_update(user, {}, request)
|
|
|
|
return user
|
|
|
|
async def request_verify(
|
|
self, user: models.UP, request: Optional[Request] = None
|
|
) -> None:
|
|
"""
|
|
Start a verification request.
|
|
|
|
Triggers the on_after_request_verify handler on success.
|
|
|
|
:param user: The user to verify.
|
|
:param request: Optional FastAPI request that
|
|
triggered the operation, defaults to None.
|
|
:raises UserInactive: The user is inactive.
|
|
:raises UserAlreadyVerified: The user is already verified.
|
|
"""
|
|
if not user.is_active:
|
|
raise exceptions.UserInactive()
|
|
if user.is_verified:
|
|
raise exceptions.UserAlreadyVerified()
|
|
|
|
token_data = {
|
|
"sub": str(user.id),
|
|
"email": user.email,
|
|
"aud": self.verification_token_audience,
|
|
}
|
|
token = generate_jwt(
|
|
token_data,
|
|
self.verification_token_secret,
|
|
self.verification_token_lifetime_seconds,
|
|
)
|
|
await self.on_after_request_verify(user, token, request)
|
|
|
|
async def verify(self, token: str, request: Optional[Request] = None) -> models.UP:
|
|
"""
|
|
Validate a verification request.
|
|
|
|
Changes the is_verified flag of the user to True.
|
|
|
|
Triggers the on_after_verify handler on success.
|
|
|
|
:param token: The verification token generated by request_verify.
|
|
:param request: Optional FastAPI request that
|
|
triggered the operation, defaults to None.
|
|
:raises InvalidVerifyToken: The token is invalid or expired.
|
|
:raises UserAlreadyVerified: The user is already verified.
|
|
:return: The verified user.
|
|
"""
|
|
try:
|
|
data = decode_jwt(
|
|
token,
|
|
self.verification_token_secret,
|
|
[self.verification_token_audience],
|
|
)
|
|
except jwt.PyJWTError:
|
|
raise exceptions.InvalidVerifyToken()
|
|
|
|
try:
|
|
user_id = data["sub"]
|
|
email = data["email"]
|
|
except KeyError:
|
|
raise exceptions.InvalidVerifyToken()
|
|
|
|
try:
|
|
user = await self.get_by_email(email)
|
|
except exceptions.UserNotExists:
|
|
raise exceptions.InvalidVerifyToken()
|
|
|
|
try:
|
|
parsed_id = self.parse_id(user_id)
|
|
except exceptions.InvalidID:
|
|
raise exceptions.InvalidVerifyToken()
|
|
|
|
if parsed_id != user.id:
|
|
raise exceptions.InvalidVerifyToken()
|
|
|
|
if user.is_verified:
|
|
raise exceptions.UserAlreadyVerified()
|
|
|
|
verified_user = await self._update(user, {"is_verified": True})
|
|
|
|
await self.on_after_verify(verified_user, request)
|
|
|
|
return verified_user
|
|
|
|
async def forgot_password(
|
|
self, user: models.UP, request: Optional[Request] = None
|
|
) -> None:
|
|
"""
|
|
Start a forgot password request.
|
|
|
|
Triggers the on_after_forgot_password handler on success.
|
|
|
|
:param user: The user that forgot its password.
|
|
:param request: Optional FastAPI request that
|
|
triggered the operation, defaults to None.
|
|
:raises UserInactive: The user is inactive.
|
|
"""
|
|
if not user.is_active:
|
|
raise exceptions.UserInactive()
|
|
|
|
token_data = {
|
|
"sub": str(user.id),
|
|
"password_fgpt": self.password_helper.hash(user.hashed_password),
|
|
"aud": self.reset_password_token_audience,
|
|
}
|
|
token = generate_jwt(
|
|
token_data,
|
|
self.reset_password_token_secret,
|
|
self.reset_password_token_lifetime_seconds,
|
|
)
|
|
await self.on_after_forgot_password(user, token, request)
|
|
|
|
async def reset_password(
|
|
self, token: str, password: str, request: Optional[Request] = None
|
|
) -> models.UP:
|
|
"""
|
|
Reset the password of a user.
|
|
|
|
Triggers the on_after_reset_password handler on success.
|
|
|
|
:param token: The token generated by forgot_password.
|
|
:param password: The new password to set.
|
|
:param request: Optional FastAPI request that
|
|
triggered the operation, defaults to None.
|
|
:raises InvalidResetPasswordToken: The token is invalid or expired.
|
|
:raises UserInactive: The user is inactive.
|
|
:raises InvalidPasswordException: The password is invalid.
|
|
:return: The user with updated password.
|
|
"""
|
|
try:
|
|
data = decode_jwt(
|
|
token,
|
|
self.reset_password_token_secret,
|
|
[self.reset_password_token_audience],
|
|
)
|
|
except jwt.PyJWTError:
|
|
raise exceptions.InvalidResetPasswordToken()
|
|
|
|
try:
|
|
user_id = data["sub"]
|
|
password_fingerprint = data["password_fgpt"]
|
|
except KeyError:
|
|
raise exceptions.InvalidResetPasswordToken()
|
|
|
|
try:
|
|
parsed_id = self.parse_id(user_id)
|
|
except exceptions.InvalidID:
|
|
raise exceptions.InvalidResetPasswordToken()
|
|
|
|
user = await self.get(parsed_id)
|
|
|
|
valid_password_fingerprint, _ = self.password_helper.verify_and_update(
|
|
user.hashed_password, password_fingerprint
|
|
)
|
|
if not valid_password_fingerprint:
|
|
raise exceptions.InvalidResetPasswordToken()
|
|
|
|
if not user.is_active:
|
|
raise exceptions.UserInactive()
|
|
|
|
updated_user = await self._update(user, {"password": password})
|
|
|
|
await self.on_after_reset_password(user, request)
|
|
|
|
return updated_user
|
|
|
|
async def update(
|
|
self,
|
|
user_update: schemas.UU,
|
|
user: models.UP,
|
|
safe: bool = False,
|
|
request: Optional[Request] = None,
|
|
) -> models.UP:
|
|
"""
|
|
Update a user.
|
|
|
|
Triggers the on_after_update handler on success
|
|
|
|
:param user_update: The UserUpdate model containing
|
|
the changes to apply to the user.
|
|
:param user: The current user to update.
|
|
:param safe: If True, sensitive values like is_superuser or is_verified
|
|
will be ignored during the update, defaults to False
|
|
:param request: Optional FastAPI request that
|
|
triggered the operation, defaults to None.
|
|
:return: The updated user.
|
|
"""
|
|
if safe:
|
|
updated_user_data = user_update.create_update_dict()
|
|
else:
|
|
updated_user_data = user_update.create_update_dict_superuser()
|
|
updated_user = await self._update(user, updated_user_data)
|
|
await self.on_after_update(updated_user, updated_user_data, request)
|
|
return updated_user
|
|
|
|
async def delete(
|
|
self,
|
|
user: models.UP,
|
|
request: Optional[Request] = None,
|
|
) -> None:
|
|
"""
|
|
Delete a user.
|
|
|
|
:param user: The user to delete.
|
|
"""
|
|
await self.on_before_delete(user, request)
|
|
await self.user_db.delete(user)
|
|
await self.on_after_delete(user, request)
|
|
|
|
async def validate_password(
|
|
self, password: str, user: Union[schemas.UC, models.UP]
|
|
) -> None:
|
|
"""
|
|
Validate a password.
|
|
|
|
*You should overload this method to add your own validation logic.*
|
|
|
|
:param password: The password to validate.
|
|
:param user: The user associated to this password.
|
|
:raises InvalidPasswordException: The password is invalid.
|
|
:return: None if the password is valid.
|
|
"""
|
|
return # pragma: no cover
|
|
|
|
async def on_after_register(
|
|
self, user: models.UP, request: Optional[Request] = None
|
|
) -> None:
|
|
"""
|
|
Perform logic after successful user registration.
|
|
|
|
*You should overload this method to add your own logic.*
|
|
|
|
:param user: The registered user
|
|
:param request: Optional FastAPI request that
|
|
triggered the operation, defaults to None.
|
|
"""
|
|
return # pragma: no cover
|
|
|
|
async def on_after_update(
|
|
self,
|
|
user: models.UP,
|
|
update_dict: Dict[str, Any],
|
|
request: Optional[Request] = None,
|
|
) -> None:
|
|
"""
|
|
Perform logic after successful user update.
|
|
|
|
*You should overload this method to add your own logic.*
|
|
|
|
:param user: The updated user
|
|
:param update_dict: Dictionary with the updated user fields.
|
|
:param request: Optional FastAPI request that
|
|
triggered the operation, defaults to None.
|
|
"""
|
|
return # pragma: no cover
|
|
|
|
async def on_after_request_verify(
|
|
self, user: models.UP, token: str, request: Optional[Request] = None
|
|
) -> None:
|
|
"""
|
|
Perform logic after successful verification request.
|
|
|
|
*You should overload this method to add your own logic.*
|
|
|
|
:param user: The user to verify.
|
|
:param token: The verification token.
|
|
:param request: Optional FastAPI request that
|
|
triggered the operation, defaults to None.
|
|
"""
|
|
return # pragma: no cover
|
|
|
|
async def on_after_verify(
|
|
self, user: models.UP, request: Optional[Request] = None
|
|
) -> None:
|
|
"""
|
|
Perform logic after successful user verification.
|
|
|
|
*You should overload this method to add your own logic.*
|
|
|
|
:param user: The verified user.
|
|
:param request: Optional FastAPI request that
|
|
triggered the operation, defaults to None.
|
|
"""
|
|
return # pragma: no cover
|
|
|
|
async def on_after_forgot_password(
|
|
self, user: models.UP, token: str, request: Optional[Request] = None
|
|
) -> None:
|
|
"""
|
|
Perform logic after successful forgot password request.
|
|
|
|
*You should overload this method to add your own logic.*
|
|
|
|
:param user: The user that forgot its password.
|
|
:param token: The forgot password token.
|
|
:param request: Optional FastAPI request that
|
|
triggered the operation, defaults to None.
|
|
"""
|
|
return # pragma: no cover
|
|
|
|
async def on_after_reset_password(
|
|
self, user: models.UP, request: Optional[Request] = None
|
|
) -> None:
|
|
"""
|
|
Perform logic after successful password reset.
|
|
|
|
*You should overload this method to add your own logic.*
|
|
|
|
:param user: The user that reset its password.
|
|
:param request: Optional FastAPI request that
|
|
triggered the operation, defaults to None.
|
|
"""
|
|
return # pragma: no cover
|
|
|
|
async def on_after_login(
|
|
self, user: models.UP, request: Optional[Request] = None
|
|
) -> None:
|
|
"""
|
|
Perform logic after user login.
|
|
|
|
*You should overload this method to add your own logic.*
|
|
|
|
:param user: The user that is logging in
|
|
:param request: Optional FastAPI request that
|
|
triggered the operation, defaults to None.
|
|
"""
|
|
return # pragma: no cover
|
|
|
|
async def on_before_delete(
|
|
self, user: models.UP, request: Optional[Request] = None
|
|
) -> None:
|
|
"""
|
|
Perform logic before user delete.
|
|
|
|
*You should overload this method to add your own logic.*
|
|
|
|
:param user: The user to be deleted
|
|
:param request: Optional FastAPI request that
|
|
triggered the operation, defaults to None.
|
|
"""
|
|
return # pragma: no cover
|
|
|
|
async def on_after_delete(
|
|
self, user: models.UP, request: Optional[Request] = None
|
|
) -> None:
|
|
"""
|
|
Perform logic before user delete.
|
|
|
|
*You should overload this method to add your own logic.*
|
|
|
|
:param user: The user to be deleted
|
|
:param request: Optional FastAPI request that
|
|
triggered the operation, defaults to None.
|
|
"""
|
|
return # pragma: no cover
|
|
|
|
async def authenticate(
|
|
self, credentials: OAuth2PasswordRequestForm
|
|
) -> Optional[models.UP]:
|
|
"""
|
|
Authenticate and return a user following an email and a password.
|
|
|
|
Will automatically upgrade password hash if necessary.
|
|
|
|
:param credentials: The user credentials.
|
|
"""
|
|
try:
|
|
user = await self.get_by_email(credentials.username)
|
|
except exceptions.UserNotExists:
|
|
# Run the hasher to mitigate timing attack
|
|
# Inspired from Django: https://code.djangoproject.com/ticket/20760
|
|
self.password_helper.hash(credentials.password)
|
|
return None
|
|
|
|
verified, updated_password_hash = self.password_helper.verify_and_update(
|
|
credentials.password, user.hashed_password
|
|
)
|
|
if not verified:
|
|
return None
|
|
# Update password hash to a more robust one if needed
|
|
if updated_password_hash is not None:
|
|
await self.user_db.update(user, {"hashed_password": updated_password_hash})
|
|
|
|
return user
|
|
|
|
async def _update(self, user: models.UP, update_dict: Dict[str, Any]) -> models.UP:
|
|
validated_update_dict = {}
|
|
for field, value in update_dict.items():
|
|
if field == "email" and value != user.email:
|
|
try:
|
|
await self.get_by_email(value)
|
|
raise exceptions.UserAlreadyExists()
|
|
except exceptions.UserNotExists:
|
|
validated_update_dict["email"] = value
|
|
validated_update_dict["is_verified"] = False
|
|
elif field == "password":
|
|
await self.validate_password(value, user)
|
|
validated_update_dict["hashed_password"] = self.password_helper.hash(
|
|
value
|
|
)
|
|
else:
|
|
validated_update_dict[field] = value
|
|
return await self.user_db.update(user, validated_update_dict)
|
|
|
|
|
|
class UUIDIDMixin:
|
|
def parse_id(self, value: Any) -> uuid.UUID:
|
|
if isinstance(value, uuid.UUID):
|
|
return value
|
|
try:
|
|
return uuid.UUID(value)
|
|
except ValueError as e:
|
|
raise exceptions.InvalidID() from e
|
|
|
|
|
|
class IntegerIDMixin:
|
|
def parse_id(self, value: Any) -> int:
|
|
if isinstance(value, float):
|
|
raise exceptions.InvalidID()
|
|
try:
|
|
return int(value)
|
|
except ValueError as e:
|
|
raise exceptions.InvalidID() from e
|
|
|
|
|
|
UserManagerDependency = DependencyCallable[BaseUserManager[models.UP, models.ID]]
|