diff --git a/fastapi_users/manager.py b/fastapi_users/manager.py index 58a3ee5c..15501372 100644 --- a/fastapi_users/manager.py +++ b/fastapi_users/manager.py @@ -367,6 +367,7 @@ class BaseUserManager(Generic[models.UP, models.ID]): token_data = { "user_id": str(user.id), + "password_fgpt": self.password_helper.hash(user.hashed_password), "aud": self.reset_password_token_audience, } token = generate_jwt( @@ -404,6 +405,7 @@ class BaseUserManager(Generic[models.UP, models.ID]): try: user_id = data["user_id"] + password_fingerprint = data["password_fgpt"] except KeyError: raise exceptions.InvalidResetPasswordToken() @@ -414,6 +416,12 @@ class BaseUserManager(Generic[models.UP, models.ID]): user = await self.get(parsed_id) + valid_password_fingerprint, _ = self.password_helper.verify_and_update( + user.hashed_password, password_fingerprint + ) + if not valid_password_fingerprint: + raise exceptions.InvalidResetPasswordToken() + if not user.is_active: raise exceptions.UserInactive() diff --git a/tests/test_manager.py b/tests/test_manager.py index bfafad0a..36fc1ba6 100644 --- a/tests/test_manager.py +++ b/tests/test_manager.py @@ -46,11 +46,17 @@ def verify_token(user_manager: UserManagerMock[UserModel]): @pytest.fixture def forgot_password_token(user_manager: UserManagerMock[UserModel]): def _forgot_password_token( - user_id=None, lifetime=user_manager.reset_password_token_lifetime_seconds + user_id=None, + current_password_hash=None, + lifetime=user_manager.reset_password_token_lifetime_seconds, ): data = {"aud": user_manager.reset_password_token_audience} if user_id is not None: data["user_id"] = str(user_id) + if current_password_hash is not None: + data["password_fgpt"] = user_manager.password_helper.hash( + current_password_hash + ) return generate_jwt(data, user_manager.reset_password_token_secret, lifetime) return _forgot_password_token @@ -409,6 +415,11 @@ class TestForgotPassword: ) assert decoded_token["user_id"] == str(user.id) + valid_fingerprint, _ = user_manager.password_helper.verify_and_update( + user.hashed_password, decoded_token["password_fgpt"] + ) + assert valid_fingerprint is True + @pytest.mark.asyncio @pytest.mark.manager @@ -427,7 +438,10 @@ class TestResetPassword: ): with pytest.raises(InvalidResetPasswordToken): await user_manager.reset_password( - forgot_password_token(user.id, lifetime=-1), "guinevere" + forgot_password_token( + user.id, current_password_hash=user.hashed_password, lifetime=-1 + ), + "guinevere", ) assert user_manager._update.called is False assert user_manager.on_after_reset_password.called is False @@ -441,7 +455,8 @@ class TestResetPassword: ): with pytest.raises(InvalidResetPasswordToken): await user_manager.reset_password( - forgot_password_token(user_id), "guinevere" + forgot_password_token(user_id, current_password_hash="old_password"), + "guinevere", ) assert user_manager._update.called is False assert user_manager.on_after_reset_password.called is False @@ -451,7 +466,24 @@ class TestResetPassword: ): with pytest.raises(UserNotExists): await user_manager.reset_password( - forgot_password_token("d35d213e-f3d8-4f08-954a-7e0d1bea286f"), + forgot_password_token( + "d35d213e-f3d8-4f08-954a-7e0d1bea286f", + current_password_hash="old_password", + ), + "guinevere", + ) + assert user_manager._update.called is False + assert user_manager.on_after_reset_password.called is False + + async def test_already_used_token( + self, + user: UserModel, + user_manager: UserManagerMock[UserModel], + forgot_password_token, + ): + with pytest.raises(InvalidResetPasswordToken): + await user_manager.reset_password( + forgot_password_token(user.id, current_password_hash="old_password"), "guinevere", ) assert user_manager._update.called is False @@ -465,7 +497,10 @@ class TestResetPassword: ): with pytest.raises(UserInactive): await user_manager.reset_password( - forgot_password_token(inactive_user.id), + forgot_password_token( + inactive_user.id, + current_password_hash=inactive_user.hashed_password, + ), "guinevere", ) assert user_manager._update.called is False @@ -479,7 +514,9 @@ class TestResetPassword: ): with pytest.raises(InvalidPasswordException): await user_manager.reset_password( - forgot_password_token(user.id), + forgot_password_token( + user.id, current_password_hash=user.hashed_password + ), "h", ) assert user_manager.on_after_reset_password.called is False @@ -490,7 +527,10 @@ class TestResetPassword: user_manager: UserManagerMock[UserModel], forgot_password_token, ): - await user_manager.reset_password(forgot_password_token(user.id), "holygrail") + await user_manager.reset_password( + forgot_password_token(user.id, current_password_hash=user.hashed_password), + "holygrail", + ) assert user_manager._update.called is True update_dict = user_manager._update.call_args[0][1]