mirror of
				https://github.com/fastapi-users/fastapi-users.git
				synced 2025-11-01 01:48:46 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			661 lines
		
	
	
		
			22 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			661 lines
		
	
	
		
			22 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from typing import Callable
 | |
| 
 | |
| import pytest
 | |
| from fastapi.security import OAuth2PasswordRequestForm
 | |
| from pydantic import UUID4
 | |
| from pytest_mock import MockerFixture
 | |
| 
 | |
| from fastapi_users.exceptions import (
 | |
|     InvalidID,
 | |
|     InvalidPasswordException,
 | |
|     InvalidResetPasswordToken,
 | |
|     InvalidVerifyToken,
 | |
|     UserAlreadyExists,
 | |
|     UserAlreadyVerified,
 | |
|     UserInactive,
 | |
|     UserNotExists,
 | |
| )
 | |
| from fastapi_users.jwt import decode_jwt, generate_jwt
 | |
| from fastapi_users.manager import IntegerIDMixin
 | |
| from tests.conftest import (
 | |
|     UserCreate,
 | |
|     UserManagerMock,
 | |
|     UserModel,
 | |
|     UserOAuthModel,
 | |
|     UserUpdate,
 | |
| )
 | |
| 
 | |
| 
 | |
| @pytest.fixture
 | |
| def verify_token(user_manager: UserManagerMock[UserModel]):
 | |
|     def _verify_token(
 | |
|         user_id=None,
 | |
|         email=None,
 | |
|         lifetime=user_manager.verification_token_lifetime_seconds,
 | |
|     ):
 | |
|         data = {"aud": user_manager.verification_token_audience}
 | |
|         if user_id is not None:
 | |
|             data["user_id"] = str(user_id)
 | |
|         if email is not None:
 | |
|             data["email"] = email
 | |
|         return generate_jwt(data, user_manager.verification_token_secret, lifetime)
 | |
| 
 | |
|     return _verify_token
 | |
| 
 | |
| 
 | |
| @pytest.fixture
 | |
| def forgot_password_token(user_manager: UserManagerMock[UserModel]):
 | |
|     def _forgot_password_token(
 | |
|         user_id=None, lifetime=user_manager.reset_password_token_lifetime_seconds
 | |
|     ):
 | |
|         data = {"aud": user_manager.reset_password_token_audience}
 | |
|         if user_id is not None:
 | |
|             data["user_id"] = str(user_id)
 | |
|         return generate_jwt(data, user_manager.reset_password_token_secret, lifetime)
 | |
| 
 | |
|     return _forgot_password_token
 | |
| 
 | |
| 
 | |
| @pytest.fixture
 | |
| def create_oauth2_password_request_form() -> Callable[
 | |
|     [str, str], OAuth2PasswordRequestForm
 | |
| ]:
 | |
|     def _create_oauth2_password_request_form(username, password):
 | |
|         return OAuth2PasswordRequestForm(username=username, password=password, scope="")
 | |
| 
 | |
|     return _create_oauth2_password_request_form
 | |
| 
 | |
| 
 | |
| @pytest.mark.asyncio
 | |
| @pytest.mark.manager
 | |
| class TestGet:
 | |
|     async def test_not_existing_user(self, user_manager: UserManagerMock[UserModel]):
 | |
|         with pytest.raises(UserNotExists):
 | |
|             await user_manager.get(UUID4("d35d213e-f3d8-4f08-954a-7e0d1bea286f"))
 | |
| 
 | |
|     async def test_existing_user(
 | |
|         self, user_manager: UserManagerMock[UserModel], user: UserModel
 | |
|     ):
 | |
|         retrieved_user = await user_manager.get(user.id)
 | |
|         assert retrieved_user.id == user.id
 | |
| 
 | |
| 
 | |
| @pytest.mark.asyncio
 | |
| @pytest.mark.manager
 | |
| class TestGetByEmail:
 | |
|     async def test_not_existing_user(self, user_manager: UserManagerMock[UserModel]):
 | |
|         with pytest.raises(UserNotExists):
 | |
|             await user_manager.get_by_email("lancelot@camelot.bt")
 | |
| 
 | |
|     async def test_existing_user(
 | |
|         self, user_manager: UserManagerMock[UserModel], user: UserModel
 | |
|     ):
 | |
|         retrieved_user = await user_manager.get_by_email(user.email)
 | |
|         assert retrieved_user.id == user.id
 | |
| 
 | |
| 
 | |
| @pytest.mark.asyncio
 | |
| @pytest.mark.manager
 | |
| class TestGetByOAuthAccount:
 | |
|     async def test_not_existing_user(
 | |
|         self, user_manager_oauth: UserManagerMock[UserModel]
 | |
|     ):
 | |
|         with pytest.raises(UserNotExists):
 | |
|             await user_manager_oauth.get_by_oauth_account("service1", "foo")
 | |
| 
 | |
|     async def test_existing_user(
 | |
|         self, user_manager_oauth: UserManagerMock[UserModel], user_oauth: UserOAuthModel
 | |
|     ):
 | |
|         oauth_account = user_oauth.oauth_accounts[0]
 | |
|         retrieved_user = await user_manager_oauth.get_by_oauth_account(
 | |
|             oauth_account.oauth_name, oauth_account.account_id
 | |
|         )
 | |
|         assert retrieved_user.id == user_oauth.id
 | |
| 
 | |
| 
 | |
| @pytest.mark.asyncio
 | |
| @pytest.mark.manager
 | |
| class TestCreateUser:
 | |
|     @pytest.mark.parametrize(
 | |
|         "email", ["king.arthur@camelot.bt", "King.Arthur@camelot.bt"]
 | |
|     )
 | |
|     async def test_existing_user(
 | |
|         self, email: str, user_manager: UserManagerMock[UserModel]
 | |
|     ):
 | |
|         user = UserCreate(email=email, password="guinevere")
 | |
|         with pytest.raises(UserAlreadyExists):
 | |
|             await user_manager.create(user)
 | |
|         assert user_manager.on_after_register.called is False
 | |
| 
 | |
|     @pytest.mark.parametrize("email", ["lancelot@camelot.bt", "Lancelot@camelot.bt"])
 | |
|     async def test_regular_user(
 | |
|         self, email: str, user_manager: UserManagerMock[UserModel]
 | |
|     ):
 | |
|         user = UserCreate(email=email, password="guinevere")
 | |
|         created_user = await user_manager.create(user)
 | |
|         assert type(created_user) == UserModel
 | |
| 
 | |
|         assert user_manager.on_after_register.called is True
 | |
| 
 | |
|     @pytest.mark.parametrize("safe,result", [(True, False), (False, True)])
 | |
|     async def test_superuser(
 | |
|         self, user_manager: UserManagerMock[UserModel], safe: bool, result: bool
 | |
|     ):
 | |
|         user = UserCreate(
 | |
|             email="lancelot@camelot.b", password="guinevere", is_superuser=True
 | |
|         )
 | |
|         created_user = await user_manager.create(user, safe)
 | |
|         assert type(created_user) == UserModel
 | |
|         assert created_user.is_superuser is result
 | |
| 
 | |
|         assert user_manager.on_after_register.called is True
 | |
| 
 | |
|     @pytest.mark.parametrize("safe,result", [(True, True), (False, False)])
 | |
|     async def test_is_active(
 | |
|         self, user_manager: UserManagerMock[UserModel], safe: bool, result: bool
 | |
|     ):
 | |
|         user = UserCreate(
 | |
|             email="lancelot@camelot.b", password="guinevere", is_active=False
 | |
|         )
 | |
|         created_user = await user_manager.create(user, safe)
 | |
|         assert type(created_user) == UserModel
 | |
|         assert created_user.is_active is result
 | |
| 
 | |
|         assert user_manager.on_after_register.called is True
 | |
| 
 | |
| 
 | |
| @pytest.mark.asyncio
 | |
| @pytest.mark.manager
 | |
| class TestOAuthCallback:
 | |
|     async def test_existing_user_with_oauth(
 | |
|         self,
 | |
|         user_manager_oauth: UserManagerMock[UserOAuthModel],
 | |
|         user_oauth: UserOAuthModel,
 | |
|     ):
 | |
|         oauth_account = user_oauth.oauth_accounts[0]
 | |
| 
 | |
|         user = await user_manager_oauth.oauth_callback(
 | |
|             oauth_account.oauth_name,
 | |
|             "UPDATED_TOKEN",
 | |
|             oauth_account.account_id,
 | |
|             oauth_account.account_email,
 | |
|         )
 | |
| 
 | |
|         assert user.id == user_oauth.id
 | |
|         assert len(user.oauth_accounts) == 2
 | |
|         assert user.oauth_accounts[0].id == oauth_account.id
 | |
|         assert user.oauth_accounts[0].oauth_name == "service1"
 | |
|         assert user.oauth_accounts[0].access_token == "UPDATED_TOKEN"
 | |
|         assert user.oauth_accounts[1].access_token == "TOKEN"
 | |
|         assert user.oauth_accounts[1].oauth_name == "service2"
 | |
| 
 | |
|         assert user_manager_oauth.on_after_register.called is False
 | |
| 
 | |
|     async def test_existing_user_without_oauth_associate(
 | |
|         self,
 | |
|         user_manager_oauth: UserManagerMock[UserOAuthModel],
 | |
|         superuser_oauth: UserOAuthModel,
 | |
|     ):
 | |
|         user = await user_manager_oauth.oauth_callback(
 | |
|             "service1",
 | |
|             "TOKEN",
 | |
|             "superuser_oauth1",
 | |
|             superuser_oauth.email,
 | |
|             1579000751,
 | |
|             associate_by_email=True,
 | |
|         )
 | |
| 
 | |
|         assert user.id == superuser_oauth.id
 | |
|         assert len(user.oauth_accounts) == 1
 | |
|         assert user.oauth_accounts[0].id is not None
 | |
| 
 | |
|         assert user_manager_oauth.on_after_register.called is False
 | |
| 
 | |
|     async def test_existing_user_without_oauth_no_associate(
 | |
|         self,
 | |
|         user_manager_oauth: UserManagerMock[UserOAuthModel],
 | |
|         superuser_oauth: UserOAuthModel,
 | |
|     ):
 | |
|         with pytest.raises(UserAlreadyExists):
 | |
|             await user_manager_oauth.oauth_callback(
 | |
|                 "service1",
 | |
|                 "TOKEN",
 | |
|                 "superuser_oauth1",
 | |
|                 superuser_oauth.email,
 | |
|                 1579000751,
 | |
|                 associate_by_email=False,
 | |
|             )
 | |
| 
 | |
|     async def test_new_user(self, user_manager_oauth: UserManagerMock[UserOAuthModel]):
 | |
|         user = await user_manager_oauth.oauth_callback(
 | |
|             "service1", "TOKEN", "new_user_oauth1", "galahad@camelot.bt", 1579000751
 | |
|         )
 | |
| 
 | |
|         assert user.email == "galahad@camelot.bt"
 | |
|         assert len(user.oauth_accounts) == 1
 | |
|         assert user.oauth_accounts[0].id is not None
 | |
| 
 | |
|         assert user_manager_oauth.on_after_register.called is True
 | |
| 
 | |
| 
 | |
| @pytest.mark.asyncio
 | |
| @pytest.mark.manager
 | |
| class TestOAuthAssociateCallback:
 | |
|     async def test_existing_user_without_oauth_associate(
 | |
|         self,
 | |
|         user_manager_oauth: UserManagerMock[UserOAuthModel],
 | |
|         superuser_oauth: UserOAuthModel,
 | |
|     ):
 | |
| 
 | |
|         user = await user_manager_oauth.oauth_associate_callback(
 | |
|             superuser_oauth,
 | |
|             "service1",
 | |
|             "TOKEN",
 | |
|             "superuser_oauth1",
 | |
|             superuser_oauth.email,
 | |
|             1579000751,
 | |
|         )
 | |
| 
 | |
|         assert user.id == user.id
 | |
|         assert len(user.oauth_accounts) == 1
 | |
|         assert user.oauth_accounts[0].id is not None
 | |
| 
 | |
|         assert user_manager_oauth.on_after_update.called is True
 | |
| 
 | |
| 
 | |
| @pytest.mark.asyncio
 | |
| @pytest.mark.manager
 | |
| class TestRequestVerifyUser:
 | |
|     async def test_user_inactive(
 | |
|         self, user_manager: UserManagerMock[UserModel], inactive_user: UserModel
 | |
|     ):
 | |
|         with pytest.raises(UserInactive):
 | |
|             await user_manager.request_verify(inactive_user)
 | |
| 
 | |
|     async def test_user_verified(
 | |
|         self, user_manager: UserManagerMock[UserModel], verified_user: UserModel
 | |
|     ):
 | |
|         with pytest.raises(UserAlreadyVerified):
 | |
|             await user_manager.request_verify(verified_user)
 | |
| 
 | |
|     async def test_user_active_not_verified(
 | |
|         self, user_manager: UserManagerMock[UserModel], user: UserModel
 | |
|     ):
 | |
|         await user_manager.request_verify(user)
 | |
|         assert user_manager.on_after_request_verify.called is True
 | |
| 
 | |
|         actual_user = user_manager.on_after_request_verify.call_args[0][0]
 | |
|         actual_token = user_manager.on_after_request_verify.call_args[0][1]
 | |
| 
 | |
|         assert actual_user.id == user.id
 | |
|         decoded_token = decode_jwt(
 | |
|             actual_token,
 | |
|             user_manager.verification_token_secret,
 | |
|             audience=[user_manager.verification_token_audience],
 | |
|         )
 | |
|         assert decoded_token["user_id"] == str(user.id)
 | |
|         assert decoded_token["email"] == str(user.email)
 | |
| 
 | |
| 
 | |
| @pytest.mark.asyncio
 | |
| @pytest.mark.manager
 | |
| class TestVerifyUser:
 | |
|     async def test_invalid_token(self, user_manager: UserManagerMock[UserModel]):
 | |
|         with pytest.raises(InvalidVerifyToken):
 | |
|             await user_manager.verify("foo")
 | |
| 
 | |
|     async def test_token_expired(
 | |
|         self, user_manager: UserManagerMock[UserModel], user: UserModel, verify_token
 | |
|     ):
 | |
|         with pytest.raises(InvalidVerifyToken):
 | |
|             token = verify_token(user_id=user.id, email=user.email, lifetime=-1)
 | |
|             await user_manager.verify(token)
 | |
| 
 | |
|     async def test_missing_user_id(
 | |
|         self, user_manager: UserManagerMock[UserModel], user: UserModel, verify_token
 | |
|     ):
 | |
|         with pytest.raises(InvalidVerifyToken):
 | |
|             token = verify_token(email=user.email)
 | |
|             await user_manager.verify(token)
 | |
| 
 | |
|     async def test_missing_user_email(
 | |
|         self, user_manager: UserManagerMock[UserModel], user: UserModel, verify_token
 | |
|     ):
 | |
|         with pytest.raises(InvalidVerifyToken):
 | |
|             token = verify_token(user_id=user.id)
 | |
|             await user_manager.verify(token)
 | |
| 
 | |
|     async def test_invalid_user_id(
 | |
|         self, user_manager: UserManagerMock[UserModel], user: UserModel, verify_token
 | |
|     ):
 | |
|         with pytest.raises(InvalidVerifyToken):
 | |
|             token = verify_token(user_id="foo", email=user.email)
 | |
|             await user_manager.verify(token)
 | |
| 
 | |
|     async def test_invalid_email(
 | |
|         self, user_manager: UserManagerMock[UserModel], user: UserModel, verify_token
 | |
|     ):
 | |
|         with pytest.raises(InvalidVerifyToken):
 | |
|             token = verify_token(user_id=user.id, email="foo")
 | |
|             await user_manager.verify(token)
 | |
| 
 | |
|     async def test_email_id_mismatch(
 | |
|         self,
 | |
|         user_manager: UserManagerMock[UserModel],
 | |
|         user: UserModel,
 | |
|         inactive_user: UserModel,
 | |
|         verify_token,
 | |
|     ):
 | |
|         with pytest.raises(InvalidVerifyToken):
 | |
|             token = verify_token(user_id=user.id, email=inactive_user.email)
 | |
|             await user_manager.verify(token)
 | |
| 
 | |
|     async def test_verified_user(
 | |
|         self,
 | |
|         user_manager: UserManagerMock[UserModel],
 | |
|         verified_user: UserModel,
 | |
|         verify_token,
 | |
|     ):
 | |
|         with pytest.raises(UserAlreadyVerified):
 | |
|             token = verify_token(user_id=verified_user.id, email=verified_user.email)
 | |
|             await user_manager.verify(token)
 | |
| 
 | |
|     async def test_inactive_user(
 | |
|         self,
 | |
|         user_manager: UserManagerMock[UserModel],
 | |
|         inactive_user: UserModel,
 | |
|         verify_token,
 | |
|     ):
 | |
|         token = verify_token(user_id=inactive_user.id, email=inactive_user.email)
 | |
|         verified_user = await user_manager.verify(token)
 | |
| 
 | |
|         assert verified_user.is_verified is True
 | |
|         assert verified_user.is_active is False
 | |
| 
 | |
|     async def test_active_user(
 | |
|         self, user_manager: UserManagerMock[UserModel], user: UserModel, verify_token
 | |
|     ):
 | |
|         token = verify_token(user_id=user.id, email=user.email)
 | |
|         verified_user = await user_manager.verify(token)
 | |
| 
 | |
|         assert verified_user.is_verified is True
 | |
|         assert verified_user.is_active is True
 | |
| 
 | |
| 
 | |
| @pytest.mark.asyncio
 | |
| @pytest.mark.manager
 | |
| class TestForgotPassword:
 | |
|     async def test_user_inactive(
 | |
|         self, user_manager: UserManagerMock[UserModel], inactive_user: UserModel
 | |
|     ):
 | |
|         with pytest.raises(UserInactive):
 | |
|             await user_manager.forgot_password(inactive_user)
 | |
|         assert user_manager.on_after_forgot_password.called is False
 | |
| 
 | |
|     async def test_user_active(
 | |
|         self, user_manager: UserManagerMock[UserModel], user: UserModel
 | |
|     ):
 | |
|         await user_manager.forgot_password(user)
 | |
|         assert user_manager.on_after_forgot_password.called is True
 | |
| 
 | |
|         actual_user = user_manager.on_after_forgot_password.call_args[0][0]
 | |
|         actual_token = user_manager.on_after_forgot_password.call_args[0][1]
 | |
| 
 | |
|         assert actual_user.id == user.id
 | |
|         decoded_token = decode_jwt(
 | |
|             actual_token,
 | |
|             user_manager.reset_password_token_secret,
 | |
|             audience=[user_manager.reset_password_token_audience],
 | |
|         )
 | |
|         assert decoded_token["user_id"] == str(user.id)
 | |
| 
 | |
| 
 | |
| @pytest.mark.asyncio
 | |
| @pytest.mark.manager
 | |
| class TestResetPassword:
 | |
|     async def test_invalid_token(self, user_manager: UserManagerMock[UserModel]):
 | |
|         with pytest.raises(InvalidResetPasswordToken):
 | |
|             await user_manager.reset_password("foo", "guinevere")
 | |
|         assert user_manager._update.called is False
 | |
|         assert user_manager.on_after_reset_password.called is False
 | |
| 
 | |
|     async def test_token_expired(
 | |
|         self,
 | |
|         user_manager: UserManagerMock[UserModel],
 | |
|         user: UserModel,
 | |
|         forgot_password_token,
 | |
|     ):
 | |
|         with pytest.raises(InvalidResetPasswordToken):
 | |
|             await user_manager.reset_password(
 | |
|                 forgot_password_token(user.id, lifetime=-1), "guinevere"
 | |
|             )
 | |
|         assert user_manager._update.called is False
 | |
|         assert user_manager.on_after_reset_password.called is False
 | |
| 
 | |
|     @pytest.mark.parametrize("user_id", [None, "foo"])
 | |
|     async def test_valid_token_bad_payload(
 | |
|         self,
 | |
|         user_id: str,
 | |
|         user_manager: UserManagerMock[UserModel],
 | |
|         forgot_password_token,
 | |
|     ):
 | |
|         with pytest.raises(InvalidResetPasswordToken):
 | |
|             await user_manager.reset_password(
 | |
|                 forgot_password_token(user_id), "guinevere"
 | |
|             )
 | |
|         assert user_manager._update.called is False
 | |
|         assert user_manager.on_after_reset_password.called is False
 | |
| 
 | |
|     async def test_not_existing_user(
 | |
|         self, user_manager: UserManagerMock[UserModel], forgot_password_token
 | |
|     ):
 | |
|         with pytest.raises(UserNotExists):
 | |
|             await user_manager.reset_password(
 | |
|                 forgot_password_token("d35d213e-f3d8-4f08-954a-7e0d1bea286f"),
 | |
|                 "guinevere",
 | |
|             )
 | |
|         assert user_manager._update.called is False
 | |
|         assert user_manager.on_after_reset_password.called is False
 | |
| 
 | |
|     async def test_inactive_user(
 | |
|         self,
 | |
|         inactive_user: UserModel,
 | |
|         user_manager: UserManagerMock[UserModel],
 | |
|         forgot_password_token,
 | |
|     ):
 | |
|         with pytest.raises(UserInactive):
 | |
|             await user_manager.reset_password(
 | |
|                 forgot_password_token(inactive_user.id),
 | |
|                 "guinevere",
 | |
|             )
 | |
|         assert user_manager._update.called is False
 | |
|         assert user_manager.on_after_reset_password.called is False
 | |
| 
 | |
|     async def test_invalid_password(
 | |
|         self,
 | |
|         user: UserModel,
 | |
|         user_manager: UserManagerMock[UserModel],
 | |
|         forgot_password_token,
 | |
|     ):
 | |
|         with pytest.raises(InvalidPasswordException):
 | |
|             await user_manager.reset_password(
 | |
|                 forgot_password_token(user.id),
 | |
|                 "h",
 | |
|             )
 | |
|         assert user_manager.on_after_reset_password.called is False
 | |
| 
 | |
|     async def test_valid_user_password(
 | |
|         self,
 | |
|         user: UserModel,
 | |
|         user_manager: UserManagerMock[UserModel],
 | |
|         forgot_password_token,
 | |
|     ):
 | |
|         await user_manager.reset_password(forgot_password_token(user.id), "holygrail")
 | |
| 
 | |
|         assert user_manager._update.called is True
 | |
|         update_dict = user_manager._update.call_args[0][1]
 | |
|         assert update_dict == {"password": "holygrail"}
 | |
| 
 | |
|         assert user_manager.on_after_reset_password.called is True
 | |
|         actual_user = user_manager.on_after_reset_password.call_args[0][0]
 | |
|         assert actual_user.id == user.id
 | |
| 
 | |
| 
 | |
| @pytest.mark.asyncio
 | |
| @pytest.mark.manager
 | |
| class TestUpdateUser:
 | |
|     async def test_safe_update(
 | |
|         self, user: UserModel, user_manager: UserManagerMock[UserModel]
 | |
|     ):
 | |
|         user_update = UserUpdate(first_name="Arthur", is_superuser=True)
 | |
|         updated_user = await user_manager.update(user_update, user, safe=True)
 | |
| 
 | |
|         assert updated_user.first_name == "Arthur"
 | |
|         assert updated_user.is_superuser is False
 | |
| 
 | |
|         assert user_manager.on_after_update.called is True
 | |
| 
 | |
|     async def test_unsafe_update(
 | |
|         self, user: UserModel, user_manager: UserManagerMock[UserModel]
 | |
|     ):
 | |
|         user_update = UserUpdate(first_name="Arthur", is_superuser=True)
 | |
|         updated_user = await user_manager.update(user_update, user, safe=False)
 | |
| 
 | |
|         assert updated_user.first_name == "Arthur"
 | |
|         assert updated_user.is_superuser is True
 | |
| 
 | |
|         assert user_manager.on_after_update.called is True
 | |
| 
 | |
|     async def test_password_update_invalid(
 | |
|         self, user: UserModel, user_manager: UserManagerMock[UserModel]
 | |
|     ):
 | |
|         user_update = UserUpdate(password="h")
 | |
|         with pytest.raises(InvalidPasswordException):
 | |
|             await user_manager.update(user_update, user, safe=True)
 | |
| 
 | |
|         assert user_manager.on_after_update.called is False
 | |
| 
 | |
|     async def test_password_update_valid(
 | |
|         self, user: UserModel, user_manager: UserManagerMock[UserModel]
 | |
|     ):
 | |
|         old_hashed_password = user.hashed_password
 | |
|         user_update = UserUpdate(password="holygrail")
 | |
|         updated_user = await user_manager.update(user_update, user, safe=True)
 | |
| 
 | |
|         assert updated_user.hashed_password != old_hashed_password
 | |
| 
 | |
|         assert user_manager.on_after_update.called is True
 | |
| 
 | |
|     async def test_email_update_already_existing(
 | |
|         self,
 | |
|         user: UserModel,
 | |
|         superuser: UserModel,
 | |
|         user_manager: UserManagerMock[UserModel],
 | |
|     ):
 | |
|         user_update = UserUpdate(email=superuser.email)
 | |
|         with pytest.raises(UserAlreadyExists):
 | |
|             await user_manager.update(user_update, user, safe=True)
 | |
| 
 | |
|         assert user_manager.on_after_update.called is False
 | |
| 
 | |
|     async def test_email_update_with_same_email(
 | |
|         self, user: UserModel, user_manager: UserManagerMock[UserModel]
 | |
|     ):
 | |
|         user_update = UserUpdate(email=user.email)
 | |
|         updated_user = await user_manager.update(user_update, user, safe=True)
 | |
| 
 | |
|         assert updated_user.email == user.email
 | |
| 
 | |
|         assert user_manager.on_after_update.called is True
 | |
| 
 | |
| 
 | |
| @pytest.mark.asyncio
 | |
| @pytest.mark.manager
 | |
| class TestDelete:
 | |
|     async def test_delete(
 | |
|         self, user: UserModel, user_manager: UserManagerMock[UserModel]
 | |
|     ):
 | |
|         await user_manager.delete(user)
 | |
| 
 | |
|         assert user_manager.on_before_delete.called is True
 | |
| 
 | |
|         assert user_manager.on_after_delete.called is True
 | |
| 
 | |
| 
 | |
| @pytest.mark.asyncio
 | |
| @pytest.mark.manager
 | |
| class TestAuthenticate:
 | |
|     async def test_unknown_user(
 | |
|         self,
 | |
|         create_oauth2_password_request_form: Callable[
 | |
|             [str, str], OAuth2PasswordRequestForm
 | |
|         ],
 | |
|         user_manager: UserManagerMock[UserModel],
 | |
|     ):
 | |
|         form = create_oauth2_password_request_form("lancelot@camelot.bt", "guinevere")
 | |
|         user = await user_manager.authenticate(form)
 | |
|         assert user is None
 | |
| 
 | |
|     async def test_wrong_password(
 | |
|         self,
 | |
|         create_oauth2_password_request_form: Callable[
 | |
|             [str, str], OAuth2PasswordRequestForm
 | |
|         ],
 | |
|         user_manager: UserManagerMock[UserModel],
 | |
|     ):
 | |
|         form = create_oauth2_password_request_form("king.arthur@camelot.bt", "percival")
 | |
|         user = await user_manager.authenticate(form)
 | |
|         assert user is None
 | |
| 
 | |
|     async def test_valid_credentials(
 | |
|         self,
 | |
|         create_oauth2_password_request_form: Callable[
 | |
|             [str, str], OAuth2PasswordRequestForm
 | |
|         ],
 | |
|         user_manager: UserManagerMock[UserModel],
 | |
|     ):
 | |
|         form = create_oauth2_password_request_form(
 | |
|             "king.arthur@camelot.bt", "guinevere"
 | |
|         )
 | |
|         user = await user_manager.authenticate(form)
 | |
|         assert user is not None
 | |
|         assert user.email == "king.arthur@camelot.bt"
 | |
| 
 | |
|     async def test_upgrade_password_hash(
 | |
|         self,
 | |
|         mocker: MockerFixture,
 | |
|         create_oauth2_password_request_form: Callable[
 | |
|             [str, str], OAuth2PasswordRequestForm
 | |
|         ],
 | |
|         user_manager: UserManagerMock[UserModel],
 | |
|     ):
 | |
|         verify_and_update_password_patch = mocker.patch.object(
 | |
|             user_manager.password_helper, "verify_and_update"
 | |
|         )
 | |
|         verify_and_update_password_patch.return_value = (True, "updated_hash")
 | |
|         update_spy = mocker.spy(user_manager.user_db, "update")
 | |
| 
 | |
|         form = create_oauth2_password_request_form(
 | |
|             "king.arthur@camelot.bt", "guinevere"
 | |
|         )
 | |
|         user = await user_manager.authenticate(form)
 | |
|         assert user is not None
 | |
|         assert user.email == "king.arthur@camelot.bt"
 | |
|         assert update_spy.called is True
 | |
| 
 | |
| 
 | |
| def test_integer_id_mixin():
 | |
|     integer_id_mixin = IntegerIDMixin()
 | |
| 
 | |
|     assert integer_id_mixin.parse_id("123") == 123
 | |
|     assert integer_id_mixin.parse_id(123) == 123
 | |
| 
 | |
|     with pytest.raises(InvalidID):
 | |
|         integer_id_mixin.parse_id("123.42")
 | |
| 
 | |
|     with pytest.raises(InvalidID):
 | |
|         integer_id_mixin.parse_id(123.42)
 | |
| 
 | |
|     with pytest.raises(InvalidID):
 | |
|         integer_id_mixin.parse_id("abc")
 | 
