Add get_optional_* dependency callables

This commit is contained in:
François Voron
2020-05-25 08:13:42 +02:00
parent 71ca7d85ed
commit 9508768502
4 changed files with 196 additions and 15 deletions

View File

@ -10,9 +10,9 @@
Get the current user (**active or not**). Will throw a `401 Unauthorized` if missing or wrong credentials.
```py
@app.get('/protected-route')
@app.get("/protected-route")
def protected_route(user: User = Depends(fastapi_users.get_current_user)):
return f'Hello, {user.email}'
return f"Hello, {user.email}"
```
## `get_current_active_user`
@ -20,9 +20,9 @@ def protected_route(user: User = Depends(fastapi_users.get_current_user)):
Get the current active user. Will throw a `401 Unauthorized` if missing or wrong credentials or if the user is not active.
```py
@app.get('/protected-route')
@app.get("/protected-route")
def protected_route(user: User = Depends(fastapi_users.get_current_active_user)):
return f'Hello, {user.email}'
return f"Hello, {user.email}"
```
## `get_current_superuser`
@ -30,9 +30,48 @@ def protected_route(user: User = Depends(fastapi_users.get_current_active_user))
Get the current superuser. Will throw a `401 Unauthorized` if missing or wrong credentials or if the user is not active. Will throw a `403 Forbidden` if the user is not a superuser.
```py
@app.get('/protected-route')
@app.get("/protected-route")
def protected_route(user: User = Depends(fastapi_users.get_current_superuser)):
return f'Hello, {user.email}'
return f"Hello, {user.email}"
```
## `get_optional_current_user`
Get the current user (**active or not**). Will return `None` if missing or wrong credentials. It can be useful if you wish to change the behaviour of your endpoint if a user is logged in or not.
```py
@app.get("/optional-user-route")
def optional_user_route(user: Optional[User] = Depends(fastapi_users.get_optional_current_user)):
if user:
return f"Hello, {user.email}"
else:
return "Hello, anonymous"
```
## `get_optional_current_active_user`
Get the current active user. Will return `None` if missing or wrong credentials. It can be useful if you wish to change the behaviour of your endpoint if a user is logged in or not.
```py
@app.get("/optional-user-route")
def optional_user_route(user: User = Depends(fastapi_users.get_optional_current_active_user)):
if user:
return f"Hello, {user.email}"
else:
return "Hello, anonymous"
```
## `get_optional_current_superuser`
Get the current superuser. Will return `None` if missing or wrong credentials. It can be useful if you wish to change the behaviour of your endpoint if a user is logged in or not.
```py
@app.get("/optional-user-route")
def optional_user_route(user: User = Depends(fastapi_users.get_optional_current_superuser)):
if user:
return f"Hello, {user.email}"
else:
return "Hello, anonymous"
```
## In path operation
@ -40,9 +79,9 @@ def protected_route(user: User = Depends(fastapi_users.get_current_superuser)):
If you don't need a user, you can use more clear way:
```py
@app.get('/protected-route', dependencies=[Depends(fastapi_users.get_current_superuser)])
@app.get("/protected-route", dependencies=[Depends(fastapi_users.get_current_superuser)])
def protected_route():
return 'Hello, some user.'
return "Hello, some user."
```
You can read more about this [in FastAPI docs](https://fastapi.tiangolo.com/tutorial/dependencies/dependencies-in-path-operation-decorators/).

View File

@ -1,6 +1,6 @@
import re
from inspect import Parameter, Signature
from typing import Sequence
from typing import Optional, Sequence
from fastapi import Depends, HTTPException, status
from makefun import with_signature
@ -64,20 +64,43 @@ class Authenticator:
except ValueError:
raise DuplicateBackendNamesError()
@with_signature(signature, func_name="get_optional_current_user")
async def get_optional_current_user(*args, **kwargs):
return await self._authenticate(*args, **kwargs)
@with_signature(signature, func_name="get_optional_current_active_user")
async def get_optional_current_active_user(*args, **kwargs):
user = await get_optional_current_user(*args, **kwargs)
if not user or not user.is_active:
return None
return user
@with_signature(signature, func_name="get_optional_current_superuser")
async def get_optional_current_superuser(*args, **kwargs):
user = await get_optional_current_active_user(*args, **kwargs)
if not user or not user.is_superuser:
return None
return user
@with_signature(signature, func_name="get_current_user")
async def get_current_user(*args, **kwargs):
return await self._authenticate(*args, **kwargs)
user = await get_optional_current_user(*args, **kwargs)
if user is None:
raise self._get_credentials_exception()
return user
@with_signature(signature, func_name="get_current_active_user")
async def get_current_active_user(*args, **kwargs):
user = await get_current_user(*args, **kwargs)
if not user.is_active:
user = await get_optional_current_active_user(*args, **kwargs)
if user is None:
raise self._get_credentials_exception()
return user
@with_signature(signature, func_name="get_current_superuser")
async def get_current_superuser(*args, **kwargs):
user = await get_current_active_user(*args, **kwargs)
user = await get_optional_current_active_user(*args, **kwargs)
if user is None:
raise self._get_credentials_exception()
if not user.is_superuser:
raise self._get_credentials_exception(status.HTTP_403_FORBIDDEN)
return user
@ -85,15 +108,18 @@ class Authenticator:
self.get_current_user = get_current_user
self.get_current_active_user = get_current_active_user
self.get_current_superuser = get_current_superuser
self.get_optional_current_user = get_optional_current_user
self.get_optional_current_active_user = get_optional_current_active_user
self.get_optional_current_superuser = get_optional_current_superuser
async def _authenticate(self, *args, **kwargs) -> BaseUserDB:
async def _authenticate(self, *args, **kwargs) -> Optional[BaseUserDB]:
for backend in self.backends:
token: str = kwargs[name_to_variable_name(backend.name)]
if token:
user = await backend(token, self.user_db)
if user is not None:
return user
raise self._get_credentials_exception()
return None
def _get_credentials_exception(
self, status_code: int = status.HTTP_401_UNAUTHORIZED

View File

@ -59,6 +59,13 @@ class FastAPIUsers:
self.get_current_user = self.authenticator.get_current_user
self.get_current_active_user = self.authenticator.get_current_active_user
self.get_current_superuser = self.authenticator.get_current_superuser
self.get_optional_current_user = self.authenticator.get_optional_current_user
self.get_optional_current_active_user = (
self.authenticator.get_optional_current_active_user
)
self.get_optional_current_superuser = (
self.authenticator.get_optional_current_superuser
)
def get_register_router(
self, after_register: Optional[Callable[[models.UD, Request], None]] = None,

View File

@ -34,6 +34,22 @@ async def test_app_client(
def current_superuser(user=Depends(fastapi_users.get_current_superuser)):
return user
@app.get("/optional-current-user")
def optional_current_user(user=Depends(fastapi_users.get_optional_current_user)):
return user
@app.get("/optional-current-active-user")
def optional_current_active_user(
user=Depends(fastapi_users.get_optional_current_active_user),
):
return user
@app.get("/optional-current-superuser")
def optional_current_superuser(
user=Depends(fastapi_users.get_optional_current_superuser),
):
return user
return await get_test_client(app)
@ -161,3 +177,96 @@ class TestGetCurrentSuperuser:
"/current-superuser", headers={"Authorization": f"Bearer {superuser.id}"}
)
assert response.status_code == status.HTTP_200_OK
@pytest.mark.fastapi_users
@pytest.mark.asyncio
class TestOptionalGetCurrentUser:
async def test_missing_token(self, test_app_client: httpx.AsyncClient):
response = await test_app_client.get("/optional-current-user")
assert response.status_code == status.HTTP_200_OK
assert response.json() is None
async def test_invalid_token(self, test_app_client: httpx.AsyncClient):
response = await test_app_client.get(
"/optional-current-user", headers={"Authorization": "Bearer foo"}
)
assert response.status_code == status.HTTP_200_OK
assert response.json() is None
async def test_valid_token(self, test_app_client: httpx.AsyncClient, user: UserDB):
response = await test_app_client.get(
"/optional-current-user", headers={"Authorization": f"Bearer {user.id}"}
)
assert response.status_code == status.HTTP_200_OK
assert response.json() is not None
@pytest.mark.fastapi_users
@pytest.mark.asyncio
class TestOptionalGetCurrentActiveUser:
async def test_missing_token(self, test_app_client: httpx.AsyncClient):
response = await test_app_client.get("/optional-current-active-user")
assert response.status_code == status.HTTP_200_OK
assert response.json() is None
async def test_invalid_token(self, test_app_client: httpx.AsyncClient):
response = await test_app_client.get(
"/optional-current-active-user", headers={"Authorization": "Bearer foo"}
)
assert response.status_code == status.HTTP_200_OK
assert response.json() is None
async def test_valid_token_inactive_user(
self, test_app_client: httpx.AsyncClient, inactive_user: UserDB
):
response = await test_app_client.get(
"/optional-current-active-user",
headers={"Authorization": f"Bearer {inactive_user.id}"},
)
assert response.status_code == status.HTTP_200_OK
assert response.json() is None
async def test_valid_token(self, test_app_client: httpx.AsyncClient, user: UserDB):
response = await test_app_client.get(
"/optional-current-active-user",
headers={"Authorization": f"Bearer {user.id}"},
)
assert response.status_code == status.HTTP_200_OK
assert response.json() is not None
@pytest.mark.fastapi_users
@pytest.mark.asyncio
class TestOptionalGetCurrentSuperuser:
async def test_missing_token(self, test_app_client: httpx.AsyncClient):
response = await test_app_client.get("/optional-current-superuser")
assert response.status_code == status.HTTP_200_OK
assert response.json() is None
async def test_invalid_token(self, test_app_client: httpx.AsyncClient):
response = await test_app_client.get(
"/optional-current-superuser", headers={"Authorization": "Bearer foo"}
)
assert response.status_code == status.HTTP_200_OK
assert response.json() is None
async def test_valid_token_regular_user(
self, test_app_client: httpx.AsyncClient, user: UserDB
):
response = await test_app_client.get(
"/optional-current-superuser",
headers={"Authorization": f"Bearer {user.id}"},
)
assert response.status_code == status.HTTP_200_OK
assert response.json() is None
async def test_valid_token_superuser(
self, test_app_client: httpx.AsyncClient, superuser: UserDB
):
response = await test_app_client.get(
"/optional-current-superuser",
headers={"Authorization": f"Bearer {superuser.id}"},
)
assert response.status_code == status.HTTP_200_OK
assert response.json() is not None