mirror of
				https://github.com/fastapi-users/fastapi-users.git
				synced 2025-11-04 06:37:51 +08:00 
			
		
		
		
	Ensure reset password token is single use
This commit is contained in:
		@ -367,6 +367,7 @@ class BaseUserManager(Generic[models.UP, models.ID]):
 | 
			
		||||
 | 
			
		||||
        token_data = {
 | 
			
		||||
            "user_id": str(user.id),
 | 
			
		||||
            "password_fgpt": self.password_helper.hash(user.hashed_password),
 | 
			
		||||
            "aud": self.reset_password_token_audience,
 | 
			
		||||
        }
 | 
			
		||||
        token = generate_jwt(
 | 
			
		||||
@ -404,6 +405,7 @@ class BaseUserManager(Generic[models.UP, models.ID]):
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            user_id = data["user_id"]
 | 
			
		||||
            password_fingerprint = data["password_fgpt"]
 | 
			
		||||
        except KeyError:
 | 
			
		||||
            raise exceptions.InvalidResetPasswordToken()
 | 
			
		||||
 | 
			
		||||
@ -414,6 +416,12 @@ class BaseUserManager(Generic[models.UP, models.ID]):
 | 
			
		||||
 | 
			
		||||
        user = await self.get(parsed_id)
 | 
			
		||||
 | 
			
		||||
        valid_password_fingerprint, _ = self.password_helper.verify_and_update(
 | 
			
		||||
            user.hashed_password, password_fingerprint
 | 
			
		||||
        )
 | 
			
		||||
        if not valid_password_fingerprint:
 | 
			
		||||
            raise exceptions.InvalidResetPasswordToken()
 | 
			
		||||
 | 
			
		||||
        if not user.is_active:
 | 
			
		||||
            raise exceptions.UserInactive()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -46,11 +46,17 @@ def verify_token(user_manager: UserManagerMock[UserModel]):
 | 
			
		||||
@pytest.fixture
 | 
			
		||||
def forgot_password_token(user_manager: UserManagerMock[UserModel]):
 | 
			
		||||
    def _forgot_password_token(
 | 
			
		||||
        user_id=None, lifetime=user_manager.reset_password_token_lifetime_seconds
 | 
			
		||||
        user_id=None,
 | 
			
		||||
        current_password_hash=None,
 | 
			
		||||
        lifetime=user_manager.reset_password_token_lifetime_seconds,
 | 
			
		||||
    ):
 | 
			
		||||
        data = {"aud": user_manager.reset_password_token_audience}
 | 
			
		||||
        if user_id is not None:
 | 
			
		||||
            data["user_id"] = str(user_id)
 | 
			
		||||
        if current_password_hash is not None:
 | 
			
		||||
            data["password_fgpt"] = user_manager.password_helper.hash(
 | 
			
		||||
                current_password_hash
 | 
			
		||||
            )
 | 
			
		||||
        return generate_jwt(data, user_manager.reset_password_token_secret, lifetime)
 | 
			
		||||
 | 
			
		||||
    return _forgot_password_token
 | 
			
		||||
@ -409,6 +415,11 @@ class TestForgotPassword:
 | 
			
		||||
        )
 | 
			
		||||
        assert decoded_token["user_id"] == str(user.id)
 | 
			
		||||
 | 
			
		||||
        valid_fingerprint, _ = user_manager.password_helper.verify_and_update(
 | 
			
		||||
            user.hashed_password, decoded_token["password_fgpt"]
 | 
			
		||||
        )
 | 
			
		||||
        assert valid_fingerprint is True
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.mark.asyncio
 | 
			
		||||
@pytest.mark.manager
 | 
			
		||||
@ -427,7 +438,10 @@ class TestResetPassword:
 | 
			
		||||
    ):
 | 
			
		||||
        with pytest.raises(InvalidResetPasswordToken):
 | 
			
		||||
            await user_manager.reset_password(
 | 
			
		||||
                forgot_password_token(user.id, lifetime=-1), "guinevere"
 | 
			
		||||
                forgot_password_token(
 | 
			
		||||
                    user.id, current_password_hash=user.hashed_password, lifetime=-1
 | 
			
		||||
                ),
 | 
			
		||||
                "guinevere",
 | 
			
		||||
            )
 | 
			
		||||
        assert user_manager._update.called is False
 | 
			
		||||
        assert user_manager.on_after_reset_password.called is False
 | 
			
		||||
@ -441,7 +455,8 @@ class TestResetPassword:
 | 
			
		||||
    ):
 | 
			
		||||
        with pytest.raises(InvalidResetPasswordToken):
 | 
			
		||||
            await user_manager.reset_password(
 | 
			
		||||
                forgot_password_token(user_id), "guinevere"
 | 
			
		||||
                forgot_password_token(user_id, current_password_hash="old_password"),
 | 
			
		||||
                "guinevere",
 | 
			
		||||
            )
 | 
			
		||||
        assert user_manager._update.called is False
 | 
			
		||||
        assert user_manager.on_after_reset_password.called is False
 | 
			
		||||
@ -451,7 +466,24 @@ class TestResetPassword:
 | 
			
		||||
    ):
 | 
			
		||||
        with pytest.raises(UserNotExists):
 | 
			
		||||
            await user_manager.reset_password(
 | 
			
		||||
                forgot_password_token("d35d213e-f3d8-4f08-954a-7e0d1bea286f"),
 | 
			
		||||
                forgot_password_token(
 | 
			
		||||
                    "d35d213e-f3d8-4f08-954a-7e0d1bea286f",
 | 
			
		||||
                    current_password_hash="old_password",
 | 
			
		||||
                ),
 | 
			
		||||
                "guinevere",
 | 
			
		||||
            )
 | 
			
		||||
        assert user_manager._update.called is False
 | 
			
		||||
        assert user_manager.on_after_reset_password.called is False
 | 
			
		||||
 | 
			
		||||
    async def test_already_used_token(
 | 
			
		||||
        self,
 | 
			
		||||
        user: UserModel,
 | 
			
		||||
        user_manager: UserManagerMock[UserModel],
 | 
			
		||||
        forgot_password_token,
 | 
			
		||||
    ):
 | 
			
		||||
        with pytest.raises(InvalidResetPasswordToken):
 | 
			
		||||
            await user_manager.reset_password(
 | 
			
		||||
                forgot_password_token(user.id, current_password_hash="old_password"),
 | 
			
		||||
                "guinevere",
 | 
			
		||||
            )
 | 
			
		||||
        assert user_manager._update.called is False
 | 
			
		||||
@ -465,7 +497,10 @@ class TestResetPassword:
 | 
			
		||||
    ):
 | 
			
		||||
        with pytest.raises(UserInactive):
 | 
			
		||||
            await user_manager.reset_password(
 | 
			
		||||
                forgot_password_token(inactive_user.id),
 | 
			
		||||
                forgot_password_token(
 | 
			
		||||
                    inactive_user.id,
 | 
			
		||||
                    current_password_hash=inactive_user.hashed_password,
 | 
			
		||||
                ),
 | 
			
		||||
                "guinevere",
 | 
			
		||||
            )
 | 
			
		||||
        assert user_manager._update.called is False
 | 
			
		||||
@ -479,7 +514,9 @@ class TestResetPassword:
 | 
			
		||||
    ):
 | 
			
		||||
        with pytest.raises(InvalidPasswordException):
 | 
			
		||||
            await user_manager.reset_password(
 | 
			
		||||
                forgot_password_token(user.id),
 | 
			
		||||
                forgot_password_token(
 | 
			
		||||
                    user.id, current_password_hash=user.hashed_password
 | 
			
		||||
                ),
 | 
			
		||||
                "h",
 | 
			
		||||
            )
 | 
			
		||||
        assert user_manager.on_after_reset_password.called is False
 | 
			
		||||
@ -490,7 +527,10 @@ class TestResetPassword:
 | 
			
		||||
        user_manager: UserManagerMock[UserModel],
 | 
			
		||||
        forgot_password_token,
 | 
			
		||||
    ):
 | 
			
		||||
        await user_manager.reset_password(forgot_password_token(user.id), "holygrail")
 | 
			
		||||
        await user_manager.reset_password(
 | 
			
		||||
            forgot_password_token(user.id, current_password_hash=user.hashed_password),
 | 
			
		||||
            "holygrail",
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        assert user_manager._update.called is True
 | 
			
		||||
        update_dict = user_manager._update.call_args[0][1]
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user