From 950876850244228b82bac3802d504385449bafd0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Voron?= Date: Mon, 25 May 2020 08:13:42 +0200 Subject: [PATCH] Add get_optional_* dependency callables --- docs/usage/dependency-callables.md | 55 ++++++++++-- fastapi_users/authentication/__init__.py | 40 +++++++-- fastapi_users/fastapi_users.py | 7 ++ tests/test_fastapi_users.py | 109 +++++++++++++++++++++++ 4 files changed, 196 insertions(+), 15 deletions(-) diff --git a/docs/usage/dependency-callables.md b/docs/usage/dependency-callables.md index d2572399..090143ed 100644 --- a/docs/usage/dependency-callables.md +++ b/docs/usage/dependency-callables.md @@ -10,9 +10,9 @@ Get the current user (**active or not**). Will throw a `401 Unauthorized` if missing or wrong credentials. ```py -@app.get('/protected-route') +@app.get("/protected-route") def protected_route(user: User = Depends(fastapi_users.get_current_user)): - return f'Hello, {user.email}' + return f"Hello, {user.email}" ``` ## `get_current_active_user` @@ -20,9 +20,9 @@ def protected_route(user: User = Depends(fastapi_users.get_current_user)): Get the current active user. Will throw a `401 Unauthorized` if missing or wrong credentials or if the user is not active. ```py -@app.get('/protected-route') +@app.get("/protected-route") def protected_route(user: User = Depends(fastapi_users.get_current_active_user)): - return f'Hello, {user.email}' + return f"Hello, {user.email}" ``` ## `get_current_superuser` @@ -30,9 +30,48 @@ def protected_route(user: User = Depends(fastapi_users.get_current_active_user)) Get the current superuser. Will throw a `401 Unauthorized` if missing or wrong credentials or if the user is not active. Will throw a `403 Forbidden` if the user is not a superuser. ```py -@app.get('/protected-route') +@app.get("/protected-route") def protected_route(user: User = Depends(fastapi_users.get_current_superuser)): - return f'Hello, {user.email}' + return f"Hello, {user.email}" +``` + +## `get_optional_current_user` + +Get the current user (**active or not**). Will return `None` if missing or wrong credentials. It can be useful if you wish to change the behaviour of your endpoint if a user is logged in or not. + +```py +@app.get("/optional-user-route") +def optional_user_route(user: Optional[User] = Depends(fastapi_users.get_optional_current_user)): + if user: + return f"Hello, {user.email}" + else: + return "Hello, anonymous" +``` + +## `get_optional_current_active_user` + +Get the current active user. Will return `None` if missing or wrong credentials. It can be useful if you wish to change the behaviour of your endpoint if a user is logged in or not. + +```py +@app.get("/optional-user-route") +def optional_user_route(user: User = Depends(fastapi_users.get_optional_current_active_user)): + if user: + return f"Hello, {user.email}" + else: + return "Hello, anonymous" +``` + +## `get_optional_current_superuser` + +Get the current superuser. Will return `None` if missing or wrong credentials. It can be useful if you wish to change the behaviour of your endpoint if a user is logged in or not. + +```py +@app.get("/optional-user-route") +def optional_user_route(user: User = Depends(fastapi_users.get_optional_current_superuser)): + if user: + return f"Hello, {user.email}" + else: + return "Hello, anonymous" ``` ## In path operation @@ -40,9 +79,9 @@ def protected_route(user: User = Depends(fastapi_users.get_current_superuser)): If you don't need a user, you can use more clear way: ```py -@app.get('/protected-route', dependencies=[Depends(fastapi_users.get_current_superuser)]) +@app.get("/protected-route", dependencies=[Depends(fastapi_users.get_current_superuser)]) def protected_route(): - return 'Hello, some user.' + return "Hello, some user." ``` You can read more about this [in FastAPI docs](https://fastapi.tiangolo.com/tutorial/dependencies/dependencies-in-path-operation-decorators/). diff --git a/fastapi_users/authentication/__init__.py b/fastapi_users/authentication/__init__.py index 26508664..7ee8eef1 100644 --- a/fastapi_users/authentication/__init__.py +++ b/fastapi_users/authentication/__init__.py @@ -1,6 +1,6 @@ import re from inspect import Parameter, Signature -from typing import Sequence +from typing import Optional, Sequence from fastapi import Depends, HTTPException, status from makefun import with_signature @@ -64,20 +64,43 @@ class Authenticator: except ValueError: raise DuplicateBackendNamesError() + @with_signature(signature, func_name="get_optional_current_user") + async def get_optional_current_user(*args, **kwargs): + return await self._authenticate(*args, **kwargs) + + @with_signature(signature, func_name="get_optional_current_active_user") + async def get_optional_current_active_user(*args, **kwargs): + user = await get_optional_current_user(*args, **kwargs) + if not user or not user.is_active: + return None + return user + + @with_signature(signature, func_name="get_optional_current_superuser") + async def get_optional_current_superuser(*args, **kwargs): + user = await get_optional_current_active_user(*args, **kwargs) + if not user or not user.is_superuser: + return None + return user + @with_signature(signature, func_name="get_current_user") async def get_current_user(*args, **kwargs): - return await self._authenticate(*args, **kwargs) + user = await get_optional_current_user(*args, **kwargs) + if user is None: + raise self._get_credentials_exception() + return user @with_signature(signature, func_name="get_current_active_user") async def get_current_active_user(*args, **kwargs): - user = await get_current_user(*args, **kwargs) - if not user.is_active: + user = await get_optional_current_active_user(*args, **kwargs) + if user is None: raise self._get_credentials_exception() return user @with_signature(signature, func_name="get_current_superuser") async def get_current_superuser(*args, **kwargs): - user = await get_current_active_user(*args, **kwargs) + user = await get_optional_current_active_user(*args, **kwargs) + if user is None: + raise self._get_credentials_exception() if not user.is_superuser: raise self._get_credentials_exception(status.HTTP_403_FORBIDDEN) return user @@ -85,15 +108,18 @@ class Authenticator: self.get_current_user = get_current_user self.get_current_active_user = get_current_active_user self.get_current_superuser = get_current_superuser + self.get_optional_current_user = get_optional_current_user + self.get_optional_current_active_user = get_optional_current_active_user + self.get_optional_current_superuser = get_optional_current_superuser - async def _authenticate(self, *args, **kwargs) -> BaseUserDB: + async def _authenticate(self, *args, **kwargs) -> Optional[BaseUserDB]: for backend in self.backends: token: str = kwargs[name_to_variable_name(backend.name)] if token: user = await backend(token, self.user_db) if user is not None: return user - raise self._get_credentials_exception() + return None def _get_credentials_exception( self, status_code: int = status.HTTP_401_UNAUTHORIZED diff --git a/fastapi_users/fastapi_users.py b/fastapi_users/fastapi_users.py index 7fb88cd3..304332e4 100644 --- a/fastapi_users/fastapi_users.py +++ b/fastapi_users/fastapi_users.py @@ -59,6 +59,13 @@ class FastAPIUsers: self.get_current_user = self.authenticator.get_current_user self.get_current_active_user = self.authenticator.get_current_active_user self.get_current_superuser = self.authenticator.get_current_superuser + self.get_optional_current_user = self.authenticator.get_optional_current_user + self.get_optional_current_active_user = ( + self.authenticator.get_optional_current_active_user + ) + self.get_optional_current_superuser = ( + self.authenticator.get_optional_current_superuser + ) def get_register_router( self, after_register: Optional[Callable[[models.UD, Request], None]] = None, diff --git a/tests/test_fastapi_users.py b/tests/test_fastapi_users.py index af9ea659..4e50e9a9 100644 --- a/tests/test_fastapi_users.py +++ b/tests/test_fastapi_users.py @@ -34,6 +34,22 @@ async def test_app_client( def current_superuser(user=Depends(fastapi_users.get_current_superuser)): return user + @app.get("/optional-current-user") + def optional_current_user(user=Depends(fastapi_users.get_optional_current_user)): + return user + + @app.get("/optional-current-active-user") + def optional_current_active_user( + user=Depends(fastapi_users.get_optional_current_active_user), + ): + return user + + @app.get("/optional-current-superuser") + def optional_current_superuser( + user=Depends(fastapi_users.get_optional_current_superuser), + ): + return user + return await get_test_client(app) @@ -161,3 +177,96 @@ class TestGetCurrentSuperuser: "/current-superuser", headers={"Authorization": f"Bearer {superuser.id}"} ) assert response.status_code == status.HTTP_200_OK + + +@pytest.mark.fastapi_users +@pytest.mark.asyncio +class TestOptionalGetCurrentUser: + async def test_missing_token(self, test_app_client: httpx.AsyncClient): + response = await test_app_client.get("/optional-current-user") + assert response.status_code == status.HTTP_200_OK + assert response.json() is None + + async def test_invalid_token(self, test_app_client: httpx.AsyncClient): + response = await test_app_client.get( + "/optional-current-user", headers={"Authorization": "Bearer foo"} + ) + assert response.status_code == status.HTTP_200_OK + assert response.json() is None + + async def test_valid_token(self, test_app_client: httpx.AsyncClient, user: UserDB): + response = await test_app_client.get( + "/optional-current-user", headers={"Authorization": f"Bearer {user.id}"} + ) + assert response.status_code == status.HTTP_200_OK + assert response.json() is not None + + +@pytest.mark.fastapi_users +@pytest.mark.asyncio +class TestOptionalGetCurrentActiveUser: + async def test_missing_token(self, test_app_client: httpx.AsyncClient): + response = await test_app_client.get("/optional-current-active-user") + assert response.status_code == status.HTTP_200_OK + assert response.json() is None + + async def test_invalid_token(self, test_app_client: httpx.AsyncClient): + response = await test_app_client.get( + "/optional-current-active-user", headers={"Authorization": "Bearer foo"} + ) + assert response.status_code == status.HTTP_200_OK + assert response.json() is None + + async def test_valid_token_inactive_user( + self, test_app_client: httpx.AsyncClient, inactive_user: UserDB + ): + response = await test_app_client.get( + "/optional-current-active-user", + headers={"Authorization": f"Bearer {inactive_user.id}"}, + ) + assert response.status_code == status.HTTP_200_OK + assert response.json() is None + + async def test_valid_token(self, test_app_client: httpx.AsyncClient, user: UserDB): + response = await test_app_client.get( + "/optional-current-active-user", + headers={"Authorization": f"Bearer {user.id}"}, + ) + assert response.status_code == status.HTTP_200_OK + assert response.json() is not None + + +@pytest.mark.fastapi_users +@pytest.mark.asyncio +class TestOptionalGetCurrentSuperuser: + async def test_missing_token(self, test_app_client: httpx.AsyncClient): + response = await test_app_client.get("/optional-current-superuser") + assert response.status_code == status.HTTP_200_OK + assert response.json() is None + + async def test_invalid_token(self, test_app_client: httpx.AsyncClient): + response = await test_app_client.get( + "/optional-current-superuser", headers={"Authorization": "Bearer foo"} + ) + assert response.status_code == status.HTTP_200_OK + assert response.json() is None + + async def test_valid_token_regular_user( + self, test_app_client: httpx.AsyncClient, user: UserDB + ): + response = await test_app_client.get( + "/optional-current-superuser", + headers={"Authorization": f"Bearer {user.id}"}, + ) + assert response.status_code == status.HTTP_200_OK + assert response.json() is None + + async def test_valid_token_superuser( + self, test_app_client: httpx.AsyncClient, superuser: UserDB + ): + response = await test_app_client.get( + "/optional-current-superuser", + headers={"Authorization": f"Bearer {superuser.id}"}, + ) + assert response.status_code == status.HTTP_200_OK + assert response.json() is not None