Implement password validation mechanism (#632)

* Implement password validation mechanism

* Add invalid password reason

* Always pass user in password validator

* Add password validation documentation
This commit is contained in:
François Voron
2021-05-17 08:58:23 +02:00
committed by GitHub
parent 5b76d5d90a
commit 5267e605f4
18 changed files with 320 additions and 34 deletions

View File

@ -29,8 +29,9 @@ Add quickly a registration and authentication system to your [FastAPI](https://f
* [X] Extensible base user model * [X] Extensible base user model
* [X] Ready-to-use register, login, reset password and verify e-mail routes * [X] Ready-to-use register, login, reset password and verify e-mail routes
* [X] Ready-to-use OAuth2 flow * [X] Ready-to-use social OAuth2 login flow
* [X] Dependency callables to inject current user in route * [X] Dependency callables to inject current user in route
* [X] Pluggable password validation
* [X] Customizable database backend * [X] Customizable database backend
* [X] SQLAlchemy async backend included thanks to [encode/databases](https://www.encode.io/databases/) * [X] SQLAlchemy async backend included thanks to [encode/databases](https://www.encode.io/databases/)
* [X] MongoDB async backend included thanks to [mongodb/motor](https://github.com/mongodb/motor) * [X] MongoDB async backend included thanks to [mongodb/motor](https://github.com/mongodb/motor)

View File

@ -41,23 +41,6 @@ class UserDB(User, models.BaseUserDB):
You can of course add your own properties there to fit to your needs! You can of course add your own properties there to fit to your needs!
## Password validation
**FastAPI Users** doesn't provide a default password validation, but you can implement it easily with a [Pydantic validator](https://pydantic-docs.helpmanual.io/usage/validators/) on the `UserCreate` class. Here is a simple example to check if the password is at least six characters long:
```py
from fastapi_users import models
from pydantic import validator
class UserCreate(models.BaseUserCreate):
@validator('password')
def valid_password(cls, v: str):
if len(v) < 6:
raise ValueError('Password should be at least 6 characters')
return v
```
## Next steps ## Next steps
Depending on your database backend, the database configuration will differ a bit. Depending on your database backend, the database configuration will differ a bit.

View File

@ -0,0 +1,49 @@
# Password validation
FastAPI Users **doesn't have any password validation logic by default**. However, there is an argument on the `FastAPIUsers` class so that you can provide your own password validation function.
It'll be applied on each routes that need to validate the input password:
* At registration ([`/register`](../usage/routes.md#post-register))
* At password reset ([`/reset-password`](../usage/routes.md#post-reset-password))
* At profile update ([`/me`](../usage/routes.md#patch-me) and [`/{user_id}`](../usage/routes.md#patch-user_id))
## Configuration
The FastAPIUsers class accepts an optional keyword argument `validate_password`. It expects an async function which accepts in argument:
* `password` (`str`): the password to validate.
* `user` (`Union[UserRegister, User]`): user model which we are currently validating the password. Useful if you want to check that the password doesn't contain the name or the birthdate of the user for example.
This function should return `None` if the password is valid or raise `InvalidPasswordException` if not. This exception expects an argument `reason` telling why the password is invalid. It'll be part of the error response.
## Example
```py
from fastapi_users import FastAPIUsers, InvalidPasswordException
async def validate_password(
password: str,
user: Union[UserRegister, User]],
) -> None:
if len(password) < 8:
raise InvalidPasswordException(
reason="Password should be at least 8 characters"
)
if user.email in password:
raise InvalidPasswordException(
reason="Password should not contain e-mail"
)
fastapi_users = FastAPIUsers(
user_db,
[jwt_authentication],
User,
UserCreate,
UserUpdate,
UserDB,
validate_password=validate_password
)
```

View File

@ -84,6 +84,18 @@ Register a new user. Will call the `after_register` [handler](../configuration/r
} }
``` ```
!!! fail "`400 Bad Request`"
[Password validation](../configuration/password-validation.md) failed.
```json
{
"detail": {
"code": "RESET_PASSWORD_INVALID_PASSWORD",
"reason": "Password should be at least 3 characters"
}
}
```
## Reset password router ## Reset password router
### `POST /forgot-password` ### `POST /forgot-password`
@ -126,6 +138,18 @@ Reset a password. Requires the token generated by the `/forgot-password` route.
} }
``` ```
!!! fail "`400 Bad Request`"
[Password validation](../configuration/password-validation.md) failed.
```json
{
"detail": {
"code": "REGISTER_INVALID_PASSWORD",
"reason": "Password should be at least 3 characters"
}
}
```
## Verify router ## Verify router
### `POST /request-verify-token` ### `POST /request-verify-token`
@ -274,6 +298,19 @@ Update the current authenticated active user.
!!! fail "`401 Unauthorized`" !!! fail "`401 Unauthorized`"
Missing token or inactive user. Missing token or inactive user.
!!! fail "`400 Bad Request`"
[Password validation](../configuration/password-validation.md) failed.
```json
{
"detail": {
"code": "UPDATE_USER_INVALID_PASSWORD",
"reason": "Password should be at least 3 characters"
}
}
```
### `GET /{user_id}` ### `GET /{user_id}`
Return the user with id `user_id`. Return the user with id `user_id`.
@ -330,6 +367,18 @@ Update the user with id `user_id`.
!!! fail "`404 Not found`" !!! fail "`404 Not found`"
The user does not exist. The user does not exist.
!!! fail "`400 Bad Request`"
[Password validation](../configuration/password-validation.md) failed.
```json
{
"detail": {
"code": "UPDATE_USER_INVALID_PASSWORD",
"reason": "Password should be at least 3 characters"
}
}
```
### `DELETE /{user_id}` ### `DELETE /{user_id}`
Delete the user with id `user_id`. Delete the user with id `user_id`.

View File

@ -4,3 +4,4 @@ __version__ = "6.0.0"
from fastapi_users import models # noqa: F401 from fastapi_users import models # noqa: F401
from fastapi_users.fastapi_users import FastAPIUsers # noqa: F401 from fastapi_users.fastapi_users import FastAPIUsers # noqa: F401
from fastapi_users.user import InvalidPasswordException # noqa: F401

View File

@ -15,6 +15,7 @@ from fastapi_users.router import (
from fastapi_users.user import ( from fastapi_users.user import (
CreateUserProtocol, CreateUserProtocol,
GetUserProtocol, GetUserProtocol,
ValidatePasswordProtocol,
VerifyUserProtocol, VerifyUserProtocol,
get_create_user, get_create_user,
get_get_user, get_get_user,
@ -39,6 +40,8 @@ class FastAPIUsers:
:param user_create_model: Pydantic model for creating a user. :param user_create_model: Pydantic model for creating a user.
:param user_update_model: Pydantic model for updating a user. :param user_update_model: Pydantic model for updating a user.
:param user_db_model: Pydantic model of a DB representation of a user. :param user_db_model: Pydantic model of a DB representation of a user.
:param validate_password: Optional function to validate the password
at user registration, user update or password reset.
:attribute create_user: Helper function to create a user programmatically. :attribute create_user: Helper function to create a user programmatically.
:attribute current_user: Dependency callable getter to inject authenticated user :attribute current_user: Dependency callable getter to inject authenticated user
@ -56,6 +59,7 @@ class FastAPIUsers:
create_user: CreateUserProtocol create_user: CreateUserProtocol
verify_user: VerifyUserProtocol verify_user: VerifyUserProtocol
get_user: GetUserProtocol get_user: GetUserProtocol
validate_password: Optional[ValidatePasswordProtocol]
_user_model: Type[models.BaseUser] _user_model: Type[models.BaseUser]
_user_create_model: Type[models.BaseUserCreate] _user_create_model: Type[models.BaseUserCreate]
_user_update_model: Type[models.BaseUserUpdate] _user_update_model: Type[models.BaseUserUpdate]
@ -69,6 +73,7 @@ class FastAPIUsers:
user_create_model: Type[models.BaseUserCreate], user_create_model: Type[models.BaseUserCreate],
user_update_model: Type[models.BaseUserUpdate], user_update_model: Type[models.BaseUserUpdate],
user_db_model: Type[models.BaseUserDB], user_db_model: Type[models.BaseUserDB],
validate_password: Optional[ValidatePasswordProtocol] = None,
): ):
self.db = db self.db = db
self.authenticator = Authenticator(auth_backends, db) self.authenticator = Authenticator(auth_backends, db)
@ -83,6 +88,8 @@ class FastAPIUsers:
self.verify_user = get_verify_user(db) self.verify_user = get_verify_user(db)
self.get_user = get_get_user(db) self.get_user = get_get_user(db)
self.validate_password = validate_password
self.current_user = self.authenticator.current_user self.current_user = self.authenticator.current_user
self.get_current_user = self.authenticator.get_current_user self.get_current_user = self.authenticator.get_current_user
self.get_current_active_user = self.authenticator.get_current_active_user self.get_current_active_user = self.authenticator.get_current_active_user
@ -120,6 +127,7 @@ class FastAPIUsers:
self._user_model, self._user_model,
self._user_create_model, self._user_create_model,
after_register, after_register,
self.validate_password,
) )
def get_verify_router( def get_verify_router(
@ -176,6 +184,7 @@ class FastAPIUsers:
reset_password_token_lifetime_seconds, reset_password_token_lifetime_seconds,
after_forgot_password, after_forgot_password,
after_reset_password, after_reset_password,
self.validate_password,
) )
def get_auth_router( def get_auth_router(
@ -185,6 +194,8 @@ class FastAPIUsers:
Return an auth router for a given authentication backend. Return an auth router for a given authentication backend.
:param backend: The authentication backend instance. :param backend: The authentication backend instance.
:param requires_verification: Whether the authentication
require the user to be verified or not.
""" """
return get_auth_router( return get_auth_router(
backend, backend,
@ -232,6 +243,8 @@ class FastAPIUsers:
:param after_update: Optional function called :param after_update: Optional function called
after a successful user update. after a successful user update.
:param requires_verification: Whether the endpoints
require the users to be verified or not.
""" """
return get_users_router( return get_users_router(
self.db, self.db,
@ -241,4 +254,5 @@ class FastAPIUsers:
self.authenticator, self.authenticator,
after_update, after_update,
requires_verification, requires_verification,
self.validate_password,
) )

View File

@ -3,14 +3,17 @@ from typing import Callable
class ErrorCode: class ErrorCode:
REGISTER_INVALID_PASSWORD = "REGISTER_INVALID_PASSWORD"
REGISTER_USER_ALREADY_EXISTS = "REGISTER_USER_ALREADY_EXISTS" REGISTER_USER_ALREADY_EXISTS = "REGISTER_USER_ALREADY_EXISTS"
LOGIN_BAD_CREDENTIALS = "LOGIN_BAD_CREDENTIALS" LOGIN_BAD_CREDENTIALS = "LOGIN_BAD_CREDENTIALS"
LOGIN_USER_NOT_VERIFIED = "LOGIN_USER_NOT_VERIFIED" LOGIN_USER_NOT_VERIFIED = "LOGIN_USER_NOT_VERIFIED"
RESET_PASSWORD_BAD_TOKEN = "RESET_PASSWORD_BAD_TOKEN" RESET_PASSWORD_BAD_TOKEN = "RESET_PASSWORD_BAD_TOKEN"
RESET_PASSWORD_INVALID_PASSWORD = "RESET_PASSWORD_INVALID_PASSWORD"
VERIFY_USER_BAD_TOKEN = "VERIFY_USER_BAD_TOKEN" VERIFY_USER_BAD_TOKEN = "VERIFY_USER_BAD_TOKEN"
VERIFY_USER_ALREADY_VERIFIED = "VERIFY_USER_ALREADY_VERIFIED" VERIFY_USER_ALREADY_VERIFIED = "VERIFY_USER_ALREADY_VERIFIED"
VERIFY_USER_TOKEN_EXPIRED = "VERIFY_USER_TOKEN_EXPIRED" VERIFY_USER_TOKEN_EXPIRED = "VERIFY_USER_TOKEN_EXPIRED"
UPDATE_USER_EMAIL_ALREADY_EXISTS = "UPDATE_USER_EMAIL_ALREADY_EXISTS" UPDATE_USER_EMAIL_ALREADY_EXISTS = "UPDATE_USER_EMAIL_ALREADY_EXISTS"
UPDATE_USER_INVALID_PASSWORD = "UPDATE_USER_INVALID_PASSWORD"
async def run_handler(handler: Callable, *args, **kwargs): async def run_handler(handler: Callable, *args, **kwargs):

View File

@ -1,10 +1,15 @@
from typing import Callable, Optional, Type from typing import Callable, Optional, Type, cast
from fastapi import APIRouter, HTTPException, Request, status from fastapi import APIRouter, HTTPException, Request, status
from fastapi_users import models from fastapi_users import models
from fastapi_users.router.common import ErrorCode, run_handler from fastapi_users.router.common import ErrorCode, run_handler
from fastapi_users.user import CreateUserProtocol, UserAlreadyExists from fastapi_users.user import (
CreateUserProtocol,
InvalidPasswordException,
UserAlreadyExists,
ValidatePasswordProtocol,
)
def get_register_router( def get_register_router(
@ -12,6 +17,7 @@ def get_register_router(
user_model: Type[models.BaseUser], user_model: Type[models.BaseUser],
user_create_model: Type[models.BaseUserCreate], user_create_model: Type[models.BaseUserCreate],
after_register: Optional[Callable[[models.UD, Request], None]] = None, after_register: Optional[Callable[[models.UD, Request], None]] = None,
validate_password: Optional[ValidatePasswordProtocol] = None,
) -> APIRouter: ) -> APIRouter:
"""Generate a router with the register route.""" """Generate a router with the register route."""
router = APIRouter() router = APIRouter()
@ -20,6 +26,19 @@ def get_register_router(
"/register", response_model=user_model, status_code=status.HTTP_201_CREATED "/register", response_model=user_model, status_code=status.HTTP_201_CREATED
) )
async def register(request: Request, user: user_create_model): # type: ignore async def register(request: Request, user: user_create_model): # type: ignore
user = cast(models.BaseUserCreate, user) # Prevent mypy complain
if validate_password:
try:
await validate_password(user.password, user)
except InvalidPasswordException as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={
"code": ErrorCode.REGISTER_INVALID_PASSWORD,
"reason": e.reason,
},
)
try: try:
created_user = await create_user(user, safe=True) created_user = await create_user(user, safe=True)
except UserAlreadyExists: except UserAlreadyExists:

View File

@ -8,6 +8,7 @@ from fastapi_users import models
from fastapi_users.db import BaseUserDatabase from fastapi_users.db import BaseUserDatabase
from fastapi_users.password import get_password_hash from fastapi_users.password import get_password_hash
from fastapi_users.router.common import ErrorCode, run_handler from fastapi_users.router.common import ErrorCode, run_handler
from fastapi_users.user import InvalidPasswordException, ValidatePasswordProtocol
from fastapi_users.utils import JWT_ALGORITHM, generate_jwt from fastapi_users.utils import JWT_ALGORITHM, generate_jwt
RESET_PASSWORD_TOKEN_AUDIENCE = "fastapi-users:reset" RESET_PASSWORD_TOKEN_AUDIENCE = "fastapi-users:reset"
@ -19,6 +20,7 @@ def get_reset_password_router(
reset_password_token_lifetime_seconds: int = 3600, reset_password_token_lifetime_seconds: int = 3600,
after_forgot_password: Optional[Callable[[models.UD, str, Request], None]] = None, after_forgot_password: Optional[Callable[[models.UD, str, Request], None]] = None,
after_reset_password: Optional[Callable[[models.UD, Request], None]] = None, after_reset_password: Optional[Callable[[models.UD, Request], None]] = None,
validate_password: Optional[ValidatePasswordProtocol] = None,
) -> APIRouter: ) -> APIRouter:
"""Generate a router with the reset password routes.""" """Generate a router with the reset password routes."""
router = APIRouter() router = APIRouter()
@ -74,6 +76,18 @@ def get_reset_password_router(
detail=ErrorCode.RESET_PASSWORD_BAD_TOKEN, detail=ErrorCode.RESET_PASSWORD_BAD_TOKEN,
) )
if validate_password:
try:
await validate_password(password, user)
except InvalidPasswordException as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={
"code": ErrorCode.RESET_PASSWORD_INVALID_PASSWORD,
"reason": e.reason,
},
)
user.hashed_password = get_password_hash(password) user.hashed_password = get_password_hash(password)
await user_db.update(user) await user_db.update(user)
if after_reset_password: if after_reset_password:

View File

@ -8,6 +8,7 @@ from fastapi_users.authentication import Authenticator
from fastapi_users.db import BaseUserDatabase from fastapi_users.db import BaseUserDatabase
from fastapi_users.password import get_password_hash from fastapi_users.password import get_password_hash
from fastapi_users.router.common import ErrorCode, run_handler from fastapi_users.router.common import ErrorCode, run_handler
from fastapi_users.user import InvalidPasswordException, ValidatePasswordProtocol
def get_users_router( def get_users_router(
@ -18,6 +19,7 @@ def get_users_router(
authenticator: Authenticator, authenticator: Authenticator,
after_update: Optional[Callable[[models.UD, Dict[str, Any], Request], None]] = None, after_update: Optional[Callable[[models.UD, Dict[str, Any], Request], None]] = None,
requires_verification: bool = False, requires_verification: bool = False,
validate_password: Optional[ValidatePasswordProtocol] = None,
) -> APIRouter: ) -> APIRouter:
"""Generate a router with the authentication routes.""" """Generate a router with the authentication routes."""
router = APIRouter() router = APIRouter()
@ -54,7 +56,10 @@ def get_users_router(
): ):
for field in update_dict: for field in update_dict:
if field == "password": if field == "password":
hashed_password = get_password_hash(update_dict[field]) password = update_dict[field]
if validate_password:
await validate_password(password, user)
hashed_password = get_password_hash(password)
user.hashed_password = hashed_password user.hashed_password = hashed_password
else: else:
setattr(user, field, update_dict[field]) setattr(user, field, update_dict[field])
@ -84,9 +89,17 @@ def get_users_router(
updated_user, updated_user,
) # Prevent mypy complain ) # Prevent mypy complain
updated_user_data = updated_user.create_update_dict() updated_user_data = updated_user.create_update_dict()
updated_user = await _update_user(user, updated_user_data, request)
return updated_user try:
return await _update_user(user, updated_user_data, request)
except InvalidPasswordException as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={
"code": ErrorCode.UPDATE_USER_INVALID_PASSWORD,
"reason": e.reason,
},
)
@router.get( @router.get(
"/{id:uuid}", "/{id:uuid}",
@ -110,7 +123,17 @@ def get_users_router(
) # Prevent mypy complain ) # Prevent mypy complain
user = await _get_or_404(id) user = await _get_or_404(id)
updated_user_data = updated_user.create_update_dict_superuser() updated_user_data = updated_user.create_update_dict_superuser()
return await _update_user(user, updated_user_data, request)
try:
return await _update_user(user, updated_user_data, request)
except InvalidPasswordException as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={
"code": ErrorCode.UPDATE_USER_INVALID_PASSWORD,
"reason": e.reason,
},
)
@router.delete( @router.delete(
"/{id:uuid}", "/{id:uuid}",

View File

@ -1,4 +1,4 @@
from typing import Awaitable, Type from typing import Any, Awaitable, Union, Type
try: try:
from typing import Protocol from typing import Protocol
@ -12,18 +12,34 @@ from fastapi_users.db import BaseUserDatabase
from fastapi_users.password import get_password_hash from fastapi_users.password import get_password_hash
class UserAlreadyExists(Exception): class FastAPIUsersException(Exception):
pass pass
class UserNotExists(Exception): class UserAlreadyExists(FastAPIUsersException):
pass pass
class UserAlreadyVerified(Exception): class UserNotExists(FastAPIUsersException):
pass pass
class UserAlreadyVerified(FastAPIUsersException):
pass
class InvalidPasswordException(FastAPIUsersException):
def __init__(self, reason: Any) -> None:
self.reason = reason
class ValidatePasswordProtocol(Protocol): # pragma: no cover
def __call__(
self, password: str, user: Union[models.BaseUserCreate, models.BaseUserDB]
) -> Awaitable[None]:
pass
class CreateUserProtocol(Protocol): # pragma: no cover class CreateUserProtocol(Protocol): # pragma: no cover
def __call__( def __call__(
self, self,

View File

@ -56,7 +56,8 @@ nav:
- configuration/routers/reset.md - configuration/routers/reset.md
- configuration/routers/users.md - configuration/routers/users.md
- configuration/routers/verify.md - configuration/routers/verify.md
- configuration/full_example.md - configuration/password-validation.md
- configuration/full-example.md
- configuration/oauth.md - configuration/oauth.md
- Usage: - Usage:
- usage/flow.md - usage/flow.md

View File

@ -1,6 +1,7 @@
import asyncio import asyncio
from typing import AsyncGenerator, List, Optional from typing import AsyncGenerator, List, Optional
import asynctest
import httpx import httpx
import pytest import pytest
from asgi_lifespan import LifespanManager from asgi_lifespan import LifespanManager
@ -15,6 +16,7 @@ from fastapi_users.authentication import Authenticator, BaseAuthentication
from fastapi_users.db import BaseUserDatabase from fastapi_users.db import BaseUserDatabase
from fastapi_users.models import BaseOAuthAccount, BaseOAuthAccountMixin, BaseUserDB from fastapi_users.models import BaseOAuthAccount, BaseOAuthAccountMixin, BaseUserDB
from fastapi_users.password import get_password_hash from fastapi_users.password import get_password_hash
from fastapi_users.user import InvalidPasswordException, ValidatePasswordProtocol
guinevere_password_hash = get_password_hash("guinevere") guinevere_password_hash = get_password_hash("guinevere")
angharad_password_hash = get_password_hash("angharad") angharad_password_hash = get_password_hash("angharad")
@ -400,3 +402,14 @@ def oauth_client() -> OAuth2:
ACCESS_TOKEN_ENDPOINT, ACCESS_TOKEN_ENDPOINT,
name="service1", name="service1",
) )
@pytest.fixture
def validate_password() -> ValidatePasswordProtocol:
async def _validate_password(password: str, user: models.UD) -> None:
if len(password) < 3:
raise InvalidPasswordException(
reason="Password should be at least 3 characters"
)
return asynctest.CoroutineMock(wraps=_validate_password)

View File

@ -11,7 +11,7 @@ from tests.conftest import User, UserCreate, UserDB, UserUpdate
@pytest.fixture @pytest.fixture
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_app_client( async def test_app_client(
mock_user_db, mock_authentication, oauth_client, get_test_client mock_user_db, mock_authentication, oauth_client, get_test_client, validate_password
) -> AsyncGenerator[httpx.AsyncClient, None]: ) -> AsyncGenerator[httpx.AsyncClient, None]:
fastapi_users = FastAPIUsers( fastapi_users = FastAPIUsers(
mock_user_db, mock_user_db,
@ -20,6 +20,7 @@ async def test_app_client(
UserCreate, UserCreate,
UserUpdate, UserUpdate,
UserDB, UserDB,
validate_password,
) )
app = FastAPI() app = FastAPI()

View File

@ -30,7 +30,7 @@ def after_register(request):
@pytest.fixture @pytest.fixture
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_app_client( async def test_app_client(
mock_user_db, after_register, get_test_client mock_user_db, after_register, get_test_client, validate_password
) -> AsyncGenerator[httpx.AsyncClient, None]: ) -> AsyncGenerator[httpx.AsyncClient, None]:
create_user = get_create_user(mock_user_db, UserDB) create_user = get_create_user(mock_user_db, UserDB)
register_router = get_register_router( register_router = get_register_router(
@ -38,6 +38,7 @@ async def test_app_client(
User, User,
UserCreate, UserCreate,
after_register, after_register,
validate_password,
) )
app = FastAPI() app = FastAPI()
@ -79,6 +80,22 @@ class TestRegister:
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
assert after_register.called is False assert after_register.called is False
async def test_invalid_password(
self, test_app_client: httpx.AsyncClient, after_register, validate_password
):
json = {"email": "king.arthur@camelot.bt", "password": "g"}
response = await test_app_client.post("/register", json=json)
assert response.status_code == status.HTTP_400_BAD_REQUEST
data = cast(Dict[str, Any], response.json())
assert data["detail"] == {
"code": ErrorCode.REGISTER_INVALID_PASSWORD,
"reason": "Password should be at least 3 characters",
}
validate_password.assert_called_with(
"g", UserCreate(email="king.arthur@camelot.bt", password="g")
)
assert after_register.called is False
@pytest.mark.parametrize( @pytest.mark.parametrize(
"email", ["king.arthur@camelot.bt", "King.Arthur@camelot.bt"] "email", ["king.arthur@camelot.bt", "King.Arthur@camelot.bt"]
) )

View File

@ -56,13 +56,18 @@ def after_reset_password(request):
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_app_client( async def test_app_client(
mock_user_db, mock_user_db,
mock_authentication,
after_forgot_password, after_forgot_password,
after_reset_password, after_reset_password,
get_test_client, get_test_client,
validate_password,
) -> AsyncGenerator[httpx.AsyncClient, None]: ) -> AsyncGenerator[httpx.AsyncClient, None]:
reset_router = get_reset_password_router( reset_router = get_reset_password_router(
mock_user_db, SECRET, LIFETIME, after_forgot_password, after_reset_password mock_user_db,
SECRET,
LIFETIME,
after_forgot_password,
after_reset_password,
validate_password,
) )
app = FastAPI() app = FastAPI()
@ -217,6 +222,33 @@ class TestResetPassword:
assert mock_user_db.update.called is False assert mock_user_db.update.called is False
assert after_reset_password.called is False assert after_reset_password.called is False
async def test_invalid_password(
self,
mocker,
mock_user_db,
test_app_client: httpx.AsyncClient,
forgot_password_token,
user: UserDB,
after_reset_password,
validate_password,
):
mocker.spy(mock_user_db, "update")
json = {
"token": forgot_password_token(user.id),
"password": "h",
}
response = await test_app_client.post("/reset-password", json=json)
assert response.status_code == status.HTTP_400_BAD_REQUEST
data = cast(Dict[str, Any], response.json())
assert data["detail"] == {
"code": ErrorCode.RESET_PASSWORD_INVALID_PASSWORD,
"reason": "Password should be at least 3 characters",
}
validate_password.assert_called_with("h", user)
assert mock_user_db.update.called is False
assert after_reset_password.called is False
async def test_existing_user( async def test_existing_user(
self, self,
mocker, mocker,

View File

@ -28,7 +28,7 @@ def after_update(request):
@pytest.fixture @pytest.fixture
def app_factory(mock_user_db, mock_authentication, after_update): def app_factory(mock_user_db, mock_authentication, after_update, validate_password):
def _app_factory(requires_verification: bool) -> FastAPI: def _app_factory(requires_verification: bool) -> FastAPI:
mock_authentication_bis = MockAuthentication(name="mock-bis") mock_authentication_bis = MockAuthentication(name="mock-bis")
authenticator = Authenticator( authenticator = Authenticator(
@ -43,6 +43,7 @@ def app_factory(mock_user_db, mock_authentication, after_update):
authenticator, authenticator,
after_update, after_update,
requires_verification=requires_verification, requires_verification=requires_verification,
validate_password=validate_password,
) )
app = FastAPI() app = FastAPI()
@ -166,6 +167,32 @@ class TestUpdateMe:
assert data["detail"] == ErrorCode.UPDATE_USER_EMAIL_ALREADY_EXISTS assert data["detail"] == ErrorCode.UPDATE_USER_EMAIL_ALREADY_EXISTS
assert after_update.called is False assert after_update.called is False
async def test_invalid_password(
self,
test_app_client: Tuple[httpx.AsyncClient, bool],
user: UserDB,
after_update,
validate_password,
):
client, requires_verification = test_app_client
response = await client.patch(
"/me",
json={"password": "m"},
headers={"Authorization": f"Bearer {user.id}"},
)
if requires_verification:
assert response.status_code == status.HTTP_401_UNAUTHORIZED
assert after_update.called is False
else:
assert response.status_code == status.HTTP_400_BAD_REQUEST
data = cast(Dict[str, Any], response.json())
assert data["detail"] == {
"code": ErrorCode.UPDATE_USER_INVALID_PASSWORD,
"reason": "Password should be at least 3 characters",
}
validate_password.assert_called_with("m", user)
assert after_update.called is False
async def test_empty_body( async def test_empty_body(
self, self,
test_app_client: Tuple[httpx.AsyncClient, bool], test_app_client: Tuple[httpx.AsyncClient, bool],
@ -726,6 +753,29 @@ class TestUpdateUser:
assert data["detail"] == ErrorCode.UPDATE_USER_EMAIL_ALREADY_EXISTS assert data["detail"] == ErrorCode.UPDATE_USER_EMAIL_ALREADY_EXISTS
assert after_update.called is False assert after_update.called is False
async def test_invalid_password_verified_superuser(
self,
test_app_client: Tuple[httpx.AsyncClient, bool],
user: UserDB,
verified_superuser: UserDB,
after_update,
validate_password,
):
client, _ = test_app_client
response = await client.patch(
f"/{user.id}",
json={"password": "m"},
headers={"Authorization": f"Bearer {verified_superuser.id}"},
)
assert response.status_code == status.HTTP_400_BAD_REQUEST
data = cast(Dict[str, Any], response.json())
assert data["detail"] == {
"code": ErrorCode.UPDATE_USER_INVALID_PASSWORD,
"reason": "Password should be at least 3 characters",
}
validate_password.assert_called_with("m", user)
assert after_update.called is False
async def test_valid_body_verified_superuser( async def test_valid_body_verified_superuser(
self, self,
test_app_client: Tuple[httpx.AsyncClient, bool], test_app_client: Tuple[httpx.AsyncClient, bool],