Implement JWT authentication

This commit is contained in:
François Voron
2019-10-08 17:18:38 +02:00
parent 20aa806375
commit 06dd8ad22e
10 changed files with 233 additions and 37 deletions

View File

@ -1,15 +1,21 @@
import pytest
from fastapi import HTTPException
from starlette import status
from starlette.responses import Response
from fastapi_users.authentication import BaseAuthentication
from fastapi_users.db import BaseUserDatabase
from fastapi_users.models import UserDB
from fastapi_users.password import get_password_hash
active_user = UserDB(
active_user_data = UserDB(
id='aaa',
email='king.arthur@camelot.bt',
hashed_password=get_password_hash('guinevere'),
)
inactive_user = UserDB(
inactive_user_data = UserDB(
id='bbb',
email='percival@camelot.bt',
hashed_password=get_password_hash('angharad'),
is_active=False
@ -18,16 +24,28 @@ inactive_user = UserDB(
@pytest.fixture
def user() -> UserDB:
return active_user
return active_user_data
class MockUserDBInterface(BaseUserDatabase):
@pytest.fixture
def inactive_user() -> UserDB:
return inactive_user_data
class MockUserDatabase(BaseUserDatabase):
async def get(self, id: str) -> UserDB:
if id == active_user_data.id:
return active_user_data
elif id == inactive_user_data.id:
return inactive_user_data
return None
async def get_by_email(self, email: str) -> UserDB:
if email == active_user.email:
return active_user
elif email == inactive_user.email:
return inactive_user
if email == active_user_data.email:
return active_user_data
elif email == inactive_user_data.email:
return inactive_user_data
return None
async def create(self, user: UserDB) -> UserDB:
@ -35,5 +53,22 @@ class MockUserDBInterface(BaseUserDatabase):
@pytest.fixture
def mock_db_interface() -> MockUserDBInterface:
return MockUserDBInterface()
def mock_user_db() -> MockUserDatabase:
return MockUserDatabase()
class MockAuthentication(BaseAuthentication):
async def get_login_response(self, user: UserDB, response: Response):
return {'token': user.id}
async def authenticate(self, token: str) -> UserDB:
user = await self.userDB.get(token)
if user is None or not user.is_active:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
return user
@pytest.fixture
def mock_authentication(mock_user_db) -> MockAuthentication:
return MockAuthentication(mock_user_db)

View File

@ -0,0 +1,70 @@
import jwt
import pytest
from fastapi import Depends, FastAPI
from starlette import status
from starlette.responses import Response
from starlette.testclient import TestClient
from fastapi_users.authentication.jwt import JWTAuthentication, generate_jwt
from fastapi_users.models import UserDB
SECRET = 'SECRET'
ALGORITHM = 'HS256'
LIFETIME = 3600
@pytest.fixture
def jwt_authentication(mock_user_db):
return JWTAuthentication(SECRET, LIFETIME, mock_user_db)
@pytest.fixture
def token():
def _token(user, lifetime=LIFETIME):
data = {'user_id': user.id}
return generate_jwt(data, lifetime, SECRET, ALGORITHM)
return _token
@pytest.fixture
def test_auth_client(jwt_authentication):
app = FastAPI()
@app.get('/test-auth')
def test_auth(user: UserDB = Depends(jwt_authentication.get_authentication_method())):
return user
return TestClient(app)
@pytest.mark.asyncio
async def test_get_login_response(jwt_authentication, user):
login_response = await jwt_authentication.get_login_response(user, Response())
assert 'token' in login_response
token = login_response['token']
decoded = jwt.decode(token, SECRET, algorithms=[ALGORITHM])
assert decoded['user_id'] == user.id
class TestGetAuthenticationMethod:
def test_missing_token(self, test_auth_client):
response = test_auth_client.get('/test-auth')
assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_invalid_token(self, test_auth_client):
response = test_auth_client.get('/test-auth', headers={'Authorization': 'Bearer foo'})
assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_valid_token_inactive_user(self, test_auth_client, token, inactive_user):
response = test_auth_client.get('/test-auth', headers={'Authorization': f'Bearer {token(inactive_user)}'})
assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_valid_token(self, test_auth_client, token, user):
response = test_auth_client.get('/test-auth', headers={'Authorization': f'Bearer {token(user)}'})
assert response.status_code == status.HTTP_200_OK
json = response.json()
assert json['id'] == user.id

View File

@ -1,5 +1,4 @@
import pytest
from fastapi.security import OAuth2PasswordRequestForm
@ -11,27 +10,26 @@ def create_oauth2_password_request_form():
password=password,
scope='',
)
return _create_oauth2_password_request_form
class TestAuthenticate:
@pytest.mark.asyncio
async def test_unknown_user(self, create_oauth2_password_request_form, mock_db_interface):
async def test_unknown_user(self, create_oauth2_password_request_form, mock_user_db):
form = create_oauth2_password_request_form('lancelot@camelot.bt', 'guinevere')
user = await mock_db_interface.authenticate(form)
user = await mock_user_db.authenticate(form)
assert user is None
@pytest.mark.asyncio
async def test_wrong_password(self, create_oauth2_password_request_form, mock_db_interface):
async def test_wrong_password(self, create_oauth2_password_request_form, mock_user_db):
form = create_oauth2_password_request_form('king.arthur@camelot.bt', 'percival')
user = await mock_db_interface.authenticate(form)
user = await mock_user_db.authenticate(form)
assert user is None
@pytest.mark.asyncio
async def test_valid_credentials(self, create_oauth2_password_request_form, mock_db_interface):
async def test_valid_credentials(self, create_oauth2_password_request_form, mock_user_db):
form = create_oauth2_password_request_form('king.arthur@camelot.bt', 'guinevere')
user = await mock_db_interface.authenticate(form)
user = await mock_user_db.authenticate(form)
assert user is not None
assert user.email == 'king.arthur@camelot.bt'

View File

@ -1,14 +1,15 @@
import pytest
from fastapi import FastAPI
from starlette import status
from starlette.testclient import TestClient
from fastapi_users.models import UserDB
from fastapi_users.router import UserRouter
@pytest.fixture
def test_app_client(mock_db_interface) -> TestClient:
from fastapi import FastAPI
from fastapi_users.router import UserRouter
userRouter = UserRouter(mock_db_interface)
def test_app_client(mock_user_db, mock_authentication) -> TestClient:
userRouter = UserRouter(mock_user_db, mock_authentication)
app = FastAPI()
app.include_router(userRouter)
@ -82,13 +83,14 @@ class TestLogin:
response = test_app_client.post('/login', data=data)
assert response.status_code == status.HTTP_400_BAD_REQUEST
def test_valid_credentials(self, test_app_client: TestClient):
def test_valid_credentials(self, test_app_client: TestClient, user: UserDB):
data = {
'username': 'king.arthur@camelot.bt',
'password': 'guinevere',
}
response = test_app_client.post('/login', data=data)
assert response.status_code == status.HTTP_200_OK
assert response.json() == {'token': user.id}
def test_inactive_user(self, test_app_client: TestClient):
data = {