mirror of
https://github.com/fastapi-users/fastapi-users.git
synced 2025-08-15 19:30:47 +08:00
72 lines
2.1 KiB
Python
72 lines
2.1 KiB
Python
from typing import Generic, List, Optional
|
|
|
|
import jwt
|
|
from pydantic import UUID4
|
|
|
|
from fastapi_users import models
|
|
from fastapi_users.authentication.strategy.base import (
|
|
Strategy,
|
|
StrategyDestroyNotSupportedError,
|
|
)
|
|
from fastapi_users.jwt import SecretType, decode_jwt, generate_jwt
|
|
from fastapi_users.manager import BaseUserManager, UserNotExists
|
|
|
|
|
|
class JWTStrategy(Strategy, Generic[models.UP]):
|
|
def __init__(
|
|
self,
|
|
secret: SecretType,
|
|
lifetime_seconds: Optional[int],
|
|
token_audience: List[str] = ["fastapi-users:auth"],
|
|
algorithm: str = "HS256",
|
|
public_key: Optional[SecretType] = None,
|
|
):
|
|
self.secret = secret
|
|
self.lifetime_seconds = lifetime_seconds
|
|
self.token_audience = token_audience
|
|
self.algorithm = algorithm
|
|
self.public_key = public_key
|
|
|
|
@property
|
|
def encode_key(self) -> SecretType:
|
|
return self.secret
|
|
|
|
@property
|
|
def decode_key(self) -> SecretType:
|
|
return self.public_key or self.secret
|
|
|
|
async def read_token(
|
|
self, token: Optional[str], user_manager: BaseUserManager[models.UP]
|
|
) -> Optional[models.UP]:
|
|
if token is None:
|
|
return None
|
|
|
|
try:
|
|
data = decode_jwt(
|
|
token, self.decode_key, self.token_audience, algorithms=[self.algorithm]
|
|
)
|
|
user_id = data.get("user_id")
|
|
if user_id is None:
|
|
return None
|
|
except jwt.PyJWTError:
|
|
return None
|
|
|
|
try:
|
|
user_uiid = UUID4(user_id)
|
|
return await user_manager.get(user_uiid)
|
|
except ValueError:
|
|
return None
|
|
except UserNotExists:
|
|
return None
|
|
|
|
async def write_token(self, user: models.UP) -> str:
|
|
data = {"user_id": str(user.id), "aud": self.token_audience}
|
|
return generate_jwt(
|
|
data, self.encode_key, self.lifetime_seconds, algorithm=self.algorithm
|
|
)
|
|
|
|
async def destroy_token(self, token: str, user: models.UP) -> None:
|
|
raise StrategyDestroyNotSupportedError(
|
|
"A JWT can't be invalidated: it's valid until it expires."
|
|
)
|