Files
fastapi-users/tests/test_manager.py
2023-08-26 10:14:26 +02:00

731 lines
25 KiB
Python

import uuid
from typing import Callable
import pytest
from fastapi.security import OAuth2PasswordRequestForm
from pytest_mock import MockerFixture
from fastapi_users.exceptions import (
InvalidID,
InvalidPasswordException,
InvalidResetPasswordToken,
InvalidVerifyToken,
UserAlreadyExists,
UserAlreadyVerified,
UserInactive,
UserNotExists,
)
from fastapi_users.jwt import decode_jwt, generate_jwt
from fastapi_users.manager import IntegerIDMixin
from tests.conftest import (
UserCreate,
UserManagerMock,
UserModel,
UserOAuthModel,
UserUpdate,
)
@pytest.fixture
def verify_token(user_manager: UserManagerMock[UserModel]):
def _verify_token(
user_id=None,
email=None,
lifetime=user_manager.verification_token_lifetime_seconds,
):
data = {"aud": user_manager.verification_token_audience}
if user_id is not None:
data["sub"] = str(user_id)
if email is not None:
data["email"] = email
return generate_jwt(data, user_manager.verification_token_secret, lifetime)
return _verify_token
@pytest.fixture
def forgot_password_token(user_manager: UserManagerMock[UserModel]):
def _forgot_password_token(
user_id=None,
current_password_hash=None,
lifetime=user_manager.reset_password_token_lifetime_seconds,
):
data = {"aud": user_manager.reset_password_token_audience}
if user_id is not None:
data["sub"] = str(user_id)
if current_password_hash is not None:
data["password_fgpt"] = user_manager.password_helper.hash(
current_password_hash
)
return generate_jwt(data, user_manager.reset_password_token_secret, lifetime)
return _forgot_password_token
@pytest.fixture
def create_oauth2_password_request_form() -> (
Callable[[str, str], OAuth2PasswordRequestForm]
):
def _create_oauth2_password_request_form(username, password):
return OAuth2PasswordRequestForm(username=username, password=password, scope="")
return _create_oauth2_password_request_form
@pytest.mark.asyncio
@pytest.mark.manager
class TestGet:
async def test_not_existing_user(self, user_manager: UserManagerMock[UserModel]):
with pytest.raises(UserNotExists):
await user_manager.get(uuid.UUID("d35d213e-f3d8-4f08-954a-7e0d1bea286f"))
async def test_existing_user(
self, user_manager: UserManagerMock[UserModel], user: UserModel
):
retrieved_user = await user_manager.get(user.id)
assert retrieved_user.id == user.id
@pytest.mark.asyncio
@pytest.mark.manager
class TestGetByEmail:
async def test_not_existing_user(self, user_manager: UserManagerMock[UserModel]):
with pytest.raises(UserNotExists):
await user_manager.get_by_email("lancelot@camelot.bt")
async def test_existing_user(
self, user_manager: UserManagerMock[UserModel], user: UserModel
):
retrieved_user = await user_manager.get_by_email(user.email)
assert retrieved_user.id == user.id
@pytest.mark.asyncio
@pytest.mark.manager
class TestGetByOAuthAccount:
async def test_not_existing_user(
self, user_manager_oauth: UserManagerMock[UserModel]
):
with pytest.raises(UserNotExists):
await user_manager_oauth.get_by_oauth_account("service1", "foo")
async def test_existing_user(
self, user_manager_oauth: UserManagerMock[UserModel], user_oauth: UserOAuthModel
):
oauth_account = user_oauth.oauth_accounts[0]
retrieved_user = await user_manager_oauth.get_by_oauth_account(
oauth_account.oauth_name, oauth_account.account_id
)
assert retrieved_user.id == user_oauth.id
@pytest.mark.asyncio
@pytest.mark.manager
class TestCreateUser:
@pytest.mark.parametrize(
"email", ["king.arthur@camelot.bt", "King.Arthur@camelot.bt"]
)
async def test_existing_user(
self, email: str, user_manager: UserManagerMock[UserModel]
):
user = UserCreate(email=email, password="guinevere")
with pytest.raises(UserAlreadyExists):
await user_manager.create(user)
assert user_manager.on_after_register.called is False
@pytest.mark.parametrize("email", ["lancelot@camelot.bt", "Lancelot@camelot.bt"])
async def test_regular_user(
self, email: str, user_manager: UserManagerMock[UserModel]
):
user = UserCreate(email=email, password="guinevere")
created_user = await user_manager.create(user)
assert type(created_user) == UserModel
assert user_manager.on_after_register.called is True
@pytest.mark.parametrize("safe,result", [(True, False), (False, True)])
async def test_superuser(
self, user_manager: UserManagerMock[UserModel], safe: bool, result: bool
):
user = UserCreate(
email="lancelot@camelot.b", password="guinevere", is_superuser=True
)
created_user = await user_manager.create(user, safe)
assert type(created_user) == UserModel
assert created_user.is_superuser is result
assert user_manager.on_after_register.called is True
@pytest.mark.parametrize("safe,result", [(True, True), (False, False)])
async def test_is_active(
self, user_manager: UserManagerMock[UserModel], safe: bool, result: bool
):
user = UserCreate(
email="lancelot@camelot.b", password="guinevere", is_active=False
)
created_user = await user_manager.create(user, safe)
assert type(created_user) == UserModel
assert created_user.is_active is result
assert user_manager.on_after_register.called is True
@pytest.mark.asyncio
@pytest.mark.manager
class TestOAuthCallback:
async def test_existing_user_with_oauth(
self,
user_manager_oauth: UserManagerMock[UserOAuthModel],
user_oauth: UserOAuthModel,
):
oauth_account = user_oauth.oauth_accounts[0]
user = await user_manager_oauth.oauth_callback(
oauth_account.oauth_name,
"UPDATED_TOKEN",
oauth_account.account_id,
oauth_account.account_email,
)
assert user.id == user_oauth.id
assert len(user.oauth_accounts) == 2
assert user.oauth_accounts[0].id == oauth_account.id
assert user.oauth_accounts[0].oauth_name == "service1"
assert user.oauth_accounts[0].access_token == "UPDATED_TOKEN"
assert user.oauth_accounts[1].access_token == "TOKEN"
assert user.oauth_accounts[1].oauth_name == "service2"
assert user_manager_oauth.on_after_register.called is False
async def test_existing_user_without_oauth_associate(
self,
user_manager_oauth: UserManagerMock[UserOAuthModel],
superuser_oauth: UserOAuthModel,
):
user = await user_manager_oauth.oauth_callback(
"service1",
"TOKEN",
"superuser_oauth1",
superuser_oauth.email,
1579000751,
associate_by_email=True,
)
assert user.id == superuser_oauth.id
assert len(user.oauth_accounts) == 1
assert user.oauth_accounts[0].id is not None
assert user_manager_oauth.on_after_register.called is False
async def test_existing_user_without_oauth_no_associate(
self,
user_manager_oauth: UserManagerMock[UserOAuthModel],
superuser_oauth: UserOAuthModel,
):
with pytest.raises(UserAlreadyExists):
await user_manager_oauth.oauth_callback(
"service1",
"TOKEN",
"superuser_oauth1",
superuser_oauth.email,
1579000751,
associate_by_email=False,
)
async def test_new_user(self, user_manager_oauth: UserManagerMock[UserOAuthModel]):
user = await user_manager_oauth.oauth_callback(
"service1", "TOKEN", "new_user_oauth1", "galahad@camelot.bt", 1579000751
)
assert user.email == "galahad@camelot.bt"
assert len(user.oauth_accounts) == 1
assert user.oauth_accounts[0].id is not None
assert user.is_verified is False
assert user_manager_oauth.on_after_register.called is True
async def test_new_user_is_verified_by_default(
self, user_manager_oauth: UserManagerMock[UserOAuthModel]
):
user = await user_manager_oauth.oauth_callback(
"service1",
"TOKEN",
"new_user_oauth1",
"galahad@camelot.bt",
1579000751,
is_verified_by_default=True,
)
assert user.email == "galahad@camelot.bt"
assert len(user.oauth_accounts) == 1
assert user.oauth_accounts[0].id is not None
assert user.is_verified is True
assert user_manager_oauth.on_after_register.called is True
@pytest.mark.asyncio
@pytest.mark.manager
class TestOAuthAssociateCallback:
async def test_existing_user_without_oauth_associate(
self,
user_manager_oauth: UserManagerMock[UserOAuthModel],
superuser_oauth: UserOAuthModel,
):
user = await user_manager_oauth.oauth_associate_callback(
superuser_oauth,
"service1",
"TOKEN",
"superuser_oauth1",
superuser_oauth.email,
1579000751,
)
assert user.id == user.id
assert len(user.oauth_accounts) == 1
assert user.oauth_accounts[0].id is not None
assert user_manager_oauth.on_after_update.called is True
@pytest.mark.asyncio
@pytest.mark.manager
class TestRequestVerifyUser:
async def test_user_inactive(
self, user_manager: UserManagerMock[UserModel], inactive_user: UserModel
):
with pytest.raises(UserInactive):
await user_manager.request_verify(inactive_user)
async def test_user_verified(
self, user_manager: UserManagerMock[UserModel], verified_user: UserModel
):
with pytest.raises(UserAlreadyVerified):
await user_manager.request_verify(verified_user)
async def test_user_active_not_verified(
self, user_manager: UserManagerMock[UserModel], user: UserModel
):
await user_manager.request_verify(user)
assert user_manager.on_after_request_verify.called is True
actual_user = user_manager.on_after_request_verify.call_args[0][0]
actual_token = user_manager.on_after_request_verify.call_args[0][1]
assert actual_user.id == user.id
decoded_token = decode_jwt(
actual_token,
user_manager.verification_token_secret,
audience=[user_manager.verification_token_audience],
)
assert decoded_token["sub"] == str(user.id)
assert decoded_token["email"] == str(user.email)
@pytest.mark.asyncio
@pytest.mark.manager
class TestVerifyUser:
async def test_invalid_token(self, user_manager: UserManagerMock[UserModel]):
with pytest.raises(InvalidVerifyToken):
await user_manager.verify("foo")
async def test_token_expired(
self, user_manager: UserManagerMock[UserModel], user: UserModel, verify_token
):
with pytest.raises(InvalidVerifyToken):
token = verify_token(user_id=user.id, email=user.email, lifetime=-1)
await user_manager.verify(token)
async def test_missing_user_id(
self, user_manager: UserManagerMock[UserModel], user: UserModel, verify_token
):
with pytest.raises(InvalidVerifyToken):
token = verify_token(email=user.email)
await user_manager.verify(token)
async def test_missing_user_email(
self, user_manager: UserManagerMock[UserModel], user: UserModel, verify_token
):
with pytest.raises(InvalidVerifyToken):
token = verify_token(user_id=user.id)
await user_manager.verify(token)
async def test_invalid_user_id(
self, user_manager: UserManagerMock[UserModel], user: UserModel, verify_token
):
with pytest.raises(InvalidVerifyToken):
token = verify_token(user_id="foo", email=user.email)
await user_manager.verify(token)
async def test_invalid_email(
self, user_manager: UserManagerMock[UserModel], user: UserModel, verify_token
):
with pytest.raises(InvalidVerifyToken):
token = verify_token(user_id=user.id, email="foo")
await user_manager.verify(token)
async def test_email_id_mismatch(
self,
user_manager: UserManagerMock[UserModel],
user: UserModel,
inactive_user: UserModel,
verify_token,
):
with pytest.raises(InvalidVerifyToken):
token = verify_token(user_id=user.id, email=inactive_user.email)
await user_manager.verify(token)
async def test_verified_user(
self,
user_manager: UserManagerMock[UserModel],
verified_user: UserModel,
verify_token,
):
with pytest.raises(UserAlreadyVerified):
token = verify_token(user_id=verified_user.id, email=verified_user.email)
await user_manager.verify(token)
async def test_inactive_user(
self,
user_manager: UserManagerMock[UserModel],
inactive_user: UserModel,
verify_token,
):
token = verify_token(user_id=inactive_user.id, email=inactive_user.email)
verified_user = await user_manager.verify(token)
assert verified_user.is_verified is True
assert verified_user.is_active is False
async def test_active_user(
self, user_manager: UserManagerMock[UserModel], user: UserModel, verify_token
):
token = verify_token(user_id=user.id, email=user.email)
verified_user = await user_manager.verify(token)
assert verified_user.is_verified is True
assert verified_user.is_active is True
@pytest.mark.asyncio
@pytest.mark.manager
class TestForgotPassword:
async def test_user_inactive(
self, user_manager: UserManagerMock[UserModel], inactive_user: UserModel
):
with pytest.raises(UserInactive):
await user_manager.forgot_password(inactive_user)
assert user_manager.on_after_forgot_password.called is False
async def test_user_active(
self, user_manager: UserManagerMock[UserModel], user: UserModel
):
await user_manager.forgot_password(user)
assert user_manager.on_after_forgot_password.called is True
actual_user = user_manager.on_after_forgot_password.call_args[0][0]
actual_token = user_manager.on_after_forgot_password.call_args[0][1]
assert actual_user.id == user.id
decoded_token = decode_jwt(
actual_token,
user_manager.reset_password_token_secret,
audience=[user_manager.reset_password_token_audience],
)
assert decoded_token["sub"] == str(user.id)
valid_fingerprint, _ = user_manager.password_helper.verify_and_update(
user.hashed_password, decoded_token["password_fgpt"]
)
assert valid_fingerprint is True
@pytest.mark.asyncio
@pytest.mark.manager
class TestResetPassword:
async def test_invalid_token(self, user_manager: UserManagerMock[UserModel]):
with pytest.raises(InvalidResetPasswordToken):
await user_manager.reset_password("foo", "guinevere")
assert user_manager._update.called is False
assert user_manager.on_after_reset_password.called is False
async def test_token_expired(
self,
user_manager: UserManagerMock[UserModel],
user: UserModel,
forgot_password_token,
):
with pytest.raises(InvalidResetPasswordToken):
await user_manager.reset_password(
forgot_password_token(
user.id, current_password_hash=user.hashed_password, lifetime=-1
),
"guinevere",
)
assert user_manager._update.called is False
assert user_manager.on_after_reset_password.called is False
@pytest.mark.parametrize("user_id", [None, "foo"])
async def test_valid_token_bad_payload(
self,
user_id: str,
user_manager: UserManagerMock[UserModel],
forgot_password_token,
):
with pytest.raises(InvalidResetPasswordToken):
await user_manager.reset_password(
forgot_password_token(user_id, current_password_hash="old_password"),
"guinevere",
)
assert user_manager._update.called is False
assert user_manager.on_after_reset_password.called is False
async def test_not_existing_user(
self, user_manager: UserManagerMock[UserModel], forgot_password_token
):
with pytest.raises(UserNotExists):
await user_manager.reset_password(
forgot_password_token(
"d35d213e-f3d8-4f08-954a-7e0d1bea286f",
current_password_hash="old_password",
),
"guinevere",
)
assert user_manager._update.called is False
assert user_manager.on_after_reset_password.called is False
async def test_already_used_token(
self,
user: UserModel,
user_manager: UserManagerMock[UserModel],
forgot_password_token,
):
with pytest.raises(InvalidResetPasswordToken):
await user_manager.reset_password(
forgot_password_token(user.id, current_password_hash="old_password"),
"guinevere",
)
assert user_manager._update.called is False
assert user_manager.on_after_reset_password.called is False
async def test_inactive_user(
self,
inactive_user: UserModel,
user_manager: UserManagerMock[UserModel],
forgot_password_token,
):
with pytest.raises(UserInactive):
await user_manager.reset_password(
forgot_password_token(
inactive_user.id,
current_password_hash=inactive_user.hashed_password,
),
"guinevere",
)
assert user_manager._update.called is False
assert user_manager.on_after_reset_password.called is False
async def test_invalid_password(
self,
user: UserModel,
user_manager: UserManagerMock[UserModel],
forgot_password_token,
):
with pytest.raises(InvalidPasswordException):
await user_manager.reset_password(
forgot_password_token(
user.id, current_password_hash=user.hashed_password
),
"h",
)
assert user_manager.on_after_reset_password.called is False
async def test_valid_user_password(
self,
user: UserModel,
user_manager: UserManagerMock[UserModel],
forgot_password_token,
):
await user_manager.reset_password(
forgot_password_token(user.id, current_password_hash=user.hashed_password),
"holygrail",
)
assert user_manager._update.called is True
update_dict = user_manager._update.call_args[0][1]
assert update_dict == {"password": "holygrail"}
assert user_manager.on_after_reset_password.called is True
actual_user = user_manager.on_after_reset_password.call_args[0][0]
assert actual_user.id == user.id
@pytest.mark.asyncio
@pytest.mark.manager
class TestUpdateUser:
async def test_safe_update(
self, user: UserModel, user_manager: UserManagerMock[UserModel]
):
user_update = UserUpdate(first_name="Arthur", is_superuser=True)
updated_user = await user_manager.update(user_update, user, safe=True)
assert updated_user.first_name == "Arthur"
assert updated_user.is_superuser is False
assert user_manager.on_after_update.called is True
async def test_unsafe_update(
self, user: UserModel, user_manager: UserManagerMock[UserModel]
):
user_update = UserUpdate(first_name="Arthur", is_superuser=True)
updated_user = await user_manager.update(user_update, user, safe=False)
assert updated_user.first_name == "Arthur"
assert updated_user.is_superuser is True
assert user_manager.on_after_update.called is True
async def test_unsafe_update_password_unchanged(
self, user: UserModel, user_manager: UserManagerMock[UserModel]
):
old_hashed_password = user.hashed_password
user_update = UserUpdate(password=None)
updated_user = await user_manager.update(user_update, user, safe=False)
assert updated_user.hashed_password == old_hashed_password
assert user_manager.on_after_update.called is True
async def test_password_update_invalid(
self, user: UserModel, user_manager: UserManagerMock[UserModel]
):
user_update = UserUpdate(password="h")
with pytest.raises(InvalidPasswordException):
await user_manager.update(user_update, user, safe=True)
assert user_manager.on_after_update.called is False
async def test_password_update_valid(
self, user: UserModel, user_manager: UserManagerMock[UserModel]
):
old_hashed_password = user.hashed_password
user_update = UserUpdate(password="holygrail")
updated_user = await user_manager.update(user_update, user, safe=True)
assert updated_user.hashed_password != old_hashed_password
assert user_manager.on_after_update.called is True
async def test_email_update_already_existing(
self,
user: UserModel,
superuser: UserModel,
user_manager: UserManagerMock[UserModel],
):
user_update = UserUpdate(email=superuser.email)
with pytest.raises(UserAlreadyExists):
await user_manager.update(user_update, user, safe=True)
assert user_manager.on_after_update.called is False
async def test_email_update_with_same_email(
self, user: UserModel, user_manager: UserManagerMock[UserModel]
):
user_update = UserUpdate(email=user.email)
updated_user = await user_manager.update(user_update, user, safe=True)
assert updated_user.email == user.email
assert user_manager.on_after_update.called is True
@pytest.mark.asyncio
@pytest.mark.manager
class TestDelete:
async def test_delete(
self, user: UserModel, user_manager: UserManagerMock[UserModel]
):
await user_manager.delete(user)
assert user_manager.on_before_delete.called is True
assert user_manager.on_after_delete.called is True
@pytest.mark.asyncio
@pytest.mark.manager
class TestAuthenticate:
async def test_unknown_user(
self,
create_oauth2_password_request_form: Callable[
[str, str], OAuth2PasswordRequestForm
],
user_manager: UserManagerMock[UserModel],
):
form = create_oauth2_password_request_form("lancelot@camelot.bt", "guinevere")
user = await user_manager.authenticate(form)
assert user is None
async def test_wrong_password(
self,
create_oauth2_password_request_form: Callable[
[str, str], OAuth2PasswordRequestForm
],
user_manager: UserManagerMock[UserModel],
):
form = create_oauth2_password_request_form("king.arthur@camelot.bt", "percival")
user = await user_manager.authenticate(form)
assert user is None
async def test_valid_credentials(
self,
create_oauth2_password_request_form: Callable[
[str, str], OAuth2PasswordRequestForm
],
user_manager: UserManagerMock[UserModel],
):
form = create_oauth2_password_request_form(
"king.arthur@camelot.bt", "guinevere"
)
user = await user_manager.authenticate(form)
assert user is not None
assert user.email == "king.arthur@camelot.bt"
async def test_upgrade_password_hash(
self,
mocker: MockerFixture,
create_oauth2_password_request_form: Callable[
[str, str], OAuth2PasswordRequestForm
],
user_manager: UserManagerMock[UserModel],
):
verify_and_update_password_patch = mocker.patch.object(
user_manager.password_helper, "verify_and_update"
)
verify_and_update_password_patch.return_value = (True, "updated_hash")
update_spy = mocker.spy(user_manager.user_db, "update")
form = create_oauth2_password_request_form(
"king.arthur@camelot.bt", "guinevere"
)
user = await user_manager.authenticate(form)
assert user is not None
assert user.email == "king.arthur@camelot.bt"
assert update_spy.called is True
def test_integer_id_mixin():
integer_id_mixin = IntegerIDMixin()
assert integer_id_mixin.parse_id("123") == 123
assert integer_id_mixin.parse_id(123) == 123
with pytest.raises(InvalidID):
integer_id_mixin.parse_id("123.42")
with pytest.raises(InvalidID):
integer_id_mixin.parse_id(123.42)
with pytest.raises(InvalidID):
integer_id_mixin.parse_id("abc")