from unittest.mock import MagicMock import asynctest import jwt import pytest from fastapi import FastAPI from starlette import status from starlette.requests import Request from starlette.testclient import TestClient from fastapi_users.authentication import Authenticator from fastapi_users.router import ErrorCode, Event, get_user_router from fastapi_users.utils import JWT_ALGORITHM, generate_jwt from tests.conftest import MockAuthentication, User, UserCreate, UserUpdate, UserDB SECRET = "SECRET" LIFETIME = 3600 @pytest.fixture def forgot_password_token(): def _forgot_password_token(user_id=None, lifetime=LIFETIME): data = {"aud": "fastapi-users:reset"} if user_id is not None: data["user_id"] = user_id return generate_jwt(data, lifetime, SECRET, JWT_ALGORITHM) return _forgot_password_token def event_handler_sync(): return MagicMock(return_value=None) def event_handler_async(): return asynctest.CoroutineMock(return_value=None) @pytest.fixture(params=[event_handler_sync, event_handler_async]) def event_handler(request): return request.param() @pytest.fixture() def test_app_client(mock_user_db, mock_authentication, event_handler) -> TestClient: mock_authentication_bis = MockAuthentication(name="mock-bis") authenticator = Authenticator( [mock_authentication, mock_authentication_bis], mock_user_db ) user_router = get_user_router( mock_user_db, User, UserCreate, UserUpdate, UserDB, authenticator, SECRET, LIFETIME, ) user_router.add_event_handler(Event.ON_AFTER_REGISTER, event_handler) user_router.add_event_handler(Event.ON_AFTER_FORGOT_PASSWORD, event_handler) user_router.add_event_handler(Event.ON_AFTER_UPDATE, event_handler) app = FastAPI() app.include_router(user_router) return TestClient(app) @pytest.mark.router class TestRegister: def test_empty_body(self, test_app_client: TestClient, event_handler): response = test_app_client.post("/register", json={}) assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY assert event_handler.called is False def test_missing_password(self, test_app_client: TestClient, event_handler): json = {"email": "king.arthur@camelot.bt"} response = test_app_client.post("/register", json=json) assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY assert event_handler.called is False def test_wrong_email(self, test_app_client: TestClient, event_handler): json = {"email": "king.arthur", "password": "guinevere"} response = test_app_client.post("/register", json=json) assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY assert event_handler.called is False def test_existing_user(self, test_app_client: TestClient, event_handler): json = {"email": "king.arthur@camelot.bt", "password": "guinevere"} response = test_app_client.post("/register", json=json) assert response.status_code == status.HTTP_400_BAD_REQUEST assert response.json()["detail"] == ErrorCode.REGISTER_USER_ALREADY_EXISTS assert event_handler.called is False def test_valid_body(self, test_app_client: TestClient, event_handler): json = {"email": "lancelot@camelot.bt", "password": "guinevere"} response = test_app_client.post("/register", json=json) assert response.status_code == status.HTTP_201_CREATED assert event_handler.called is True response_json = response.json() assert "hashed_password" not in response_json assert "password" not in response_json assert response_json["id"] is not None actual_user = event_handler.call_args[0][0] assert actual_user.id == response_json["id"] request = event_handler.call_args[0][1] assert isinstance(request, Request) def test_valid_body_is_superuser(self, test_app_client: TestClient, event_handler): json = { "email": "lancelot@camelot.bt", "password": "guinevere", "is_superuser": True, } response = test_app_client.post("/register", json=json) assert response.status_code == status.HTTP_201_CREATED assert event_handler.called is True response_json = response.json() assert response_json["is_superuser"] is False def test_valid_body_is_active(self, test_app_client: TestClient, event_handler): json = { "email": "lancelot@camelot.bt", "password": "guinevere", "is_active": False, } response = test_app_client.post("/register", json=json) assert response.status_code == status.HTTP_201_CREATED assert event_handler.called is True response_json = response.json() assert response_json["is_active"] is True @pytest.mark.router @pytest.mark.parametrize("path", ["/login/mock", "/login/mock-bis"]) class TestLogin: def test_empty_body(self, path, test_app_client: TestClient): response = test_app_client.post(path, data={}) assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY def test_missing_username(self, path, test_app_client: TestClient): data = {"password": "guinevere"} response = test_app_client.post(path, data=data) assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY def test_missing_password(self, path, test_app_client: TestClient): data = {"username": "king.arthur@camelot.bt"} response = test_app_client.post(path, data=data) assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY def test_not_existing_user(self, path, test_app_client: TestClient): data = {"username": "lancelot@camelot.bt", "password": "guinevere"} response = test_app_client.post(path, data=data) assert response.status_code == status.HTTP_400_BAD_REQUEST assert response.json()["detail"] == ErrorCode.LOGIN_BAD_CREDENTIALS def test_wrong_password(self, path, test_app_client: TestClient): data = {"username": "king.arthur@camelot.bt", "password": "percival"} response = test_app_client.post(path, data=data) assert response.status_code == status.HTTP_400_BAD_REQUEST assert response.json()["detail"] == ErrorCode.LOGIN_BAD_CREDENTIALS def test_valid_credentials(self, path, test_app_client: TestClient, user: UserDB): data = {"username": "king.arthur@camelot.bt", "password": "guinevere"} response = test_app_client.post(path, data=data) assert response.status_code == status.HTTP_200_OK assert response.json() == {"token": user.id} def test_inactive_user(self, path, test_app_client: TestClient): data = {"username": "percival@camelot.bt", "password": "angharad"} response = test_app_client.post(path, data=data) assert response.status_code == status.HTTP_400_BAD_REQUEST assert response.json()["detail"] == ErrorCode.LOGIN_BAD_CREDENTIALS @pytest.mark.router @pytest.mark.parametrize("path", ["/logout/mock", "/logout/mock-bis"]) class TestLogout: def test_missing_token(self, path, test_app_client: TestClient): response = test_app_client.post(path) assert response.status_code == status.HTTP_401_UNAUTHORIZED def test_unimplemented_logout( self, mocker, path, test_app_client: TestClient, user: UserDB ): get_logout_response_spy = mocker.spy(MockAuthentication, "get_logout_response") response = test_app_client.post( path, headers={"Authorization": f"Bearer {user.id}"} ) assert response.status_code == status.HTTP_202_ACCEPTED get_logout_response_spy.assert_called_once() @pytest.mark.router class TestForgotPassword: def test_empty_body(self, test_app_client: TestClient, event_handler): response = test_app_client.post("/forgot-password", json={}) assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY assert event_handler.called is False def test_not_existing_user(self, test_app_client: TestClient, event_handler): json = {"email": "lancelot@camelot.bt"} response = test_app_client.post("/forgot-password", json=json) assert response.status_code == status.HTTP_202_ACCEPTED assert event_handler.called is False def test_inactive_user(self, test_app_client: TestClient, event_handler): json = {"email": "percival@camelot.bt"} response = test_app_client.post("/forgot-password", json=json) assert response.status_code == status.HTTP_202_ACCEPTED assert event_handler.called is False def test_existing_user(self, test_app_client: TestClient, event_handler, user): json = {"email": "king.arthur@camelot.bt"} response = test_app_client.post("/forgot-password", json=json) assert response.status_code == status.HTTP_202_ACCEPTED assert event_handler.called is True actual_user = event_handler.call_args[0][0] assert actual_user.id == user.id actual_token = event_handler.call_args[0][1] decoded_token = jwt.decode( actual_token, SECRET, audience="fastapi-users:reset", algorithms=[JWT_ALGORITHM], ) assert decoded_token["user_id"] == user.id request = event_handler.call_args[0][2] assert isinstance(request, Request) @pytest.mark.router class TestResetPassword: def test_empty_body(self, test_app_client: TestClient): response = test_app_client.post("/reset-password", json={}) assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY def test_missing_token(self, test_app_client: TestClient): json = {"password": "guinevere"} response = test_app_client.post("/reset-password", json=json) assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY def test_missing_password(self, test_app_client: TestClient): json = {"token": "foo"} response = test_app_client.post("/reset-password", json=json) assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY def test_invalid_token(self, test_app_client: TestClient): json = {"token": "foo", "password": "guinevere"} response = test_app_client.post("/reset-password", json=json) assert response.status_code == status.HTTP_400_BAD_REQUEST assert response.json()["detail"] == ErrorCode.RESET_PASSWORD_BAD_TOKEN def test_valid_token_missing_user_id_payload( self, mocker, mock_user_db, test_app_client: TestClient, forgot_password_token ): mocker.spy(mock_user_db, "update") json = {"token": forgot_password_token(), "password": "holygrail"} response = test_app_client.post("/reset-password", json=json) assert response.status_code == status.HTTP_400_BAD_REQUEST assert response.json()["detail"] == ErrorCode.RESET_PASSWORD_BAD_TOKEN assert mock_user_db.update.called is False def test_inactive_user( self, mocker, mock_user_db, test_app_client: TestClient, forgot_password_token, inactive_user: UserDB, ): mocker.spy(mock_user_db, "update") json = { "token": forgot_password_token(inactive_user.id), "password": "holygrail", } response = test_app_client.post("/reset-password", json=json) assert response.status_code == status.HTTP_400_BAD_REQUEST assert response.json()["detail"] == ErrorCode.RESET_PASSWORD_BAD_TOKEN assert mock_user_db.update.called is False def test_existing_user( self, mocker, mock_user_db, test_app_client: TestClient, forgot_password_token, user: UserDB, ): mocker.spy(mock_user_db, "update") current_hashed_passord = user.hashed_password json = {"token": forgot_password_token(user.id), "password": "holygrail"} response = test_app_client.post("/reset-password", json=json) assert response.status_code == status.HTTP_200_OK assert mock_user_db.update.called is True updated_user = mock_user_db.update.call_args[0][0] assert updated_user.hashed_password != current_hashed_passord @pytest.mark.router class TestMe: def test_missing_token(self, test_app_client: TestClient): response = test_app_client.get("/me") assert response.status_code == status.HTTP_401_UNAUTHORIZED def test_inactive_user(self, test_app_client: TestClient, inactive_user: UserDB): response = test_app_client.get( "/me", headers={"Authorization": f"Bearer {inactive_user.id}"} ) assert response.status_code == status.HTTP_401_UNAUTHORIZED def test_active_user(self, test_app_client: TestClient, user: UserDB): response = test_app_client.get( "/me", headers={"Authorization": f"Bearer {user.id}"} ) assert response.status_code == status.HTTP_200_OK response_json = response.json() assert response_json["id"] == user.id assert response_json["email"] == user.email @pytest.mark.router class TestUpdateMe: def test_missing_token(self, test_app_client: TestClient, event_handler): response = test_app_client.patch("/me") assert response.status_code == status.HTTP_401_UNAUTHORIZED assert event_handler.called is False def test_inactive_user( self, test_app_client: TestClient, inactive_user: UserDB, event_handler ): response = test_app_client.patch( "/me", headers={"Authorization": f"Bearer {inactive_user.id}"} ) assert response.status_code == status.HTTP_401_UNAUTHORIZED assert event_handler.called is False def test_empty_body(self, test_app_client: TestClient, user: UserDB, event_handler): response = test_app_client.patch( "/me", json={}, headers={"Authorization": f"Bearer {user.id}"} ) assert response.status_code == status.HTTP_200_OK response_json = response.json() assert response_json["email"] == user.email assert event_handler.called is True actual_user = event_handler.call_args[0][0] assert actual_user.id == user.id updated_fields = event_handler.call_args[0][1] assert updated_fields == {} request = event_handler.call_args[0][2] assert isinstance(request, Request) def test_valid_body(self, test_app_client: TestClient, user: UserDB, event_handler): json = {"email": "king.arthur@tintagel.bt"} response = test_app_client.patch( "/me", json=json, headers={"Authorization": f"Bearer {user.id}"} ) assert response.status_code == status.HTTP_200_OK response_json = response.json() assert response_json["email"] == "king.arthur@tintagel.bt" assert event_handler.called is True actual_user = event_handler.call_args[0][0] assert actual_user.id == user.id updated_fields = event_handler.call_args[0][1] assert updated_fields == {"email": "king.arthur@tintagel.bt"} request = event_handler.call_args[0][2] assert isinstance(request, Request) def test_valid_body_is_superuser( self, test_app_client: TestClient, user: UserDB, event_handler ): json = {"is_superuser": True} response = test_app_client.patch( "/me", json=json, headers={"Authorization": f"Bearer {user.id}"} ) assert response.status_code == status.HTTP_200_OK response_json = response.json() assert response_json["is_superuser"] is False assert event_handler.called is True actual_user = event_handler.call_args[0][0] assert actual_user.id == user.id updated_fields = event_handler.call_args[0][1] assert updated_fields == {} request = event_handler.call_args[0][2] assert isinstance(request, Request) def test_valid_body_is_active( self, test_app_client: TestClient, user: UserDB, event_handler ): json = {"is_active": False} response = test_app_client.patch( "/me", json=json, headers={"Authorization": f"Bearer {user.id}"} ) assert response.status_code == status.HTTP_200_OK response_json = response.json() assert response_json["is_active"] is True assert event_handler.called is True actual_user = event_handler.call_args[0][0] assert actual_user.id == user.id updated_fields = event_handler.call_args[0][1] assert updated_fields == {} request = event_handler.call_args[0][2] assert isinstance(request, Request) def test_valid_body_password( self, mocker, mock_user_db, test_app_client: TestClient, user: UserDB, event_handler, ): mocker.spy(mock_user_db, "update") current_hashed_passord = user.hashed_password json = {"password": "merlin"} response = test_app_client.patch( "/me", json=json, headers={"Authorization": f"Bearer {user.id}"} ) assert response.status_code == status.HTTP_200_OK assert mock_user_db.update.called is True updated_user = mock_user_db.update.call_args[0][0] assert updated_user.hashed_password != current_hashed_passord assert event_handler.called is True actual_user = event_handler.call_args[0][0] assert actual_user.id == user.id updated_fields = event_handler.call_args[0][1] assert updated_fields == {"password": "merlin"} request = event_handler.call_args[0][2] assert isinstance(request, Request) @pytest.mark.router class TestListUsers: def test_missing_token(self, test_app_client: TestClient): response = test_app_client.get("/") assert response.status_code == status.HTTP_401_UNAUTHORIZED def test_regular_user(self, test_app_client: TestClient, user: UserDB): response = test_app_client.get( "/", headers={"Authorization": f"Bearer {user.id}"} ) assert response.status_code == status.HTTP_403_FORBIDDEN def test_superuser(self, test_app_client: TestClient, superuser: UserDB): response = test_app_client.get( "/", headers={"Authorization": f"Bearer {superuser.id}"} ) assert response.status_code == status.HTTP_200_OK response_json = response.json() assert len(response_json) == 3 for user in response_json: assert "id" in user assert "hashed_password" not in user @pytest.mark.router class TestGetUser: def test_missing_token(self, test_app_client: TestClient): response = test_app_client.get("/000") assert response.status_code == status.HTTP_401_UNAUTHORIZED def test_regular_user(self, test_app_client: TestClient, user: UserDB): response = test_app_client.get( "/000", headers={"Authorization": f"Bearer {user.id}"} ) assert response.status_code == status.HTTP_403_FORBIDDEN def test_not_existing_user(self, test_app_client: TestClient, superuser: UserDB): response = test_app_client.get( "/000", headers={"Authorization": f"Bearer {superuser.id}"} ) assert response.status_code == status.HTTP_404_NOT_FOUND def test_superuser( self, test_app_client: TestClient, user: UserDB, superuser: UserDB ): response = test_app_client.get( f"/{user.id}", headers={"Authorization": f"Bearer {superuser.id}"} ) assert response.status_code == status.HTTP_200_OK response_json = response.json() assert response_json["id"] == user.id assert "hashed_password" not in response_json @pytest.mark.router class TestUpdateUser: def test_missing_token(self, test_app_client: TestClient): response = test_app_client.patch("/000") assert response.status_code == status.HTTP_401_UNAUTHORIZED def test_regular_user(self, test_app_client: TestClient, user: UserDB): response = test_app_client.patch( "/000", headers={"Authorization": f"Bearer {user.id}"} ) assert response.status_code == status.HTTP_403_FORBIDDEN def test_not_existing_user(self, test_app_client: TestClient, superuser: UserDB): response = test_app_client.patch( "/000", json={}, headers={"Authorization": f"Bearer {superuser.id}"} ) assert response.status_code == status.HTTP_404_NOT_FOUND def test_empty_body( self, test_app_client: TestClient, user: UserDB, superuser: UserDB ): response = test_app_client.patch( f"/{user.id}", json={}, headers={"Authorization": f"Bearer {superuser.id}"} ) assert response.status_code == status.HTTP_200_OK response_json = response.json() assert response_json["email"] == user.email def test_valid_body( self, test_app_client: TestClient, user: UserDB, superuser: UserDB ): json = {"email": "king.arthur@tintagel.bt"} response = test_app_client.patch( f"/{user.id}", json=json, headers={"Authorization": f"Bearer {superuser.id}"}, ) assert response.status_code == status.HTTP_200_OK response_json = response.json() assert response_json["email"] == "king.arthur@tintagel.bt" def test_valid_body_is_superuser( self, test_app_client: TestClient, user: UserDB, superuser: UserDB ): json = {"is_superuser": True} response = test_app_client.patch( f"/{user.id}", json=json, headers={"Authorization": f"Bearer {superuser.id}"}, ) assert response.status_code == status.HTTP_200_OK response_json = response.json() assert response_json["is_superuser"] is True def test_valid_body_is_active( self, test_app_client: TestClient, user: UserDB, superuser: UserDB ): json = {"is_active": False} response = test_app_client.patch( f"/{user.id}", json=json, headers={"Authorization": f"Bearer {superuser.id}"}, ) assert response.status_code == status.HTTP_200_OK response_json = response.json() assert response_json["is_active"] is False def test_valid_body_password( self, mocker, mock_user_db, test_app_client: TestClient, user: UserDB, superuser: UserDB, ): mocker.spy(mock_user_db, "update") current_hashed_passord = user.hashed_password json = {"password": "merlin"} response = test_app_client.patch( f"/{user.id}", json=json, headers={"Authorization": f"Bearer {superuser.id}"}, ) assert response.status_code == status.HTTP_200_OK assert mock_user_db.update.called is True updated_user = mock_user_db.update.call_args[0][0] assert updated_user.hashed_password != current_hashed_passord @pytest.mark.router class TestDeleteUser: def test_missing_token(self, test_app_client: TestClient): response = test_app_client.delete("/000") assert response.status_code == status.HTTP_401_UNAUTHORIZED def test_regular_user(self, test_app_client: TestClient, user: UserDB): response = test_app_client.delete( "/000", headers={"Authorization": f"Bearer {user.id}"} ) assert response.status_code == status.HTTP_403_FORBIDDEN def test_not_existing_user(self, test_app_client: TestClient, superuser: UserDB): response = test_app_client.delete( "/000", headers={"Authorization": f"Bearer {superuser.id}"} ) assert response.status_code == status.HTTP_404_NOT_FOUND def test_superuser( self, mocker, mock_user_db, test_app_client: TestClient, user: UserDB, superuser: UserDB, ): mocker.spy(mock_user_db, "delete") response = test_app_client.delete( f"/{user.id}", headers={"Authorization": f"Bearer {superuser.id}"} ) assert response.status_code == status.HTTP_204_NO_CONTENT assert response.json() is None assert mock_user_db.delete.called is True deleted_user = mock_user_db.delete.call_args[0][0] assert deleted_user.id == user.id