Drop Python 3.9 support

This commit is contained in:
François Voron
2025-10-25 08:19:03 +02:00
parent ae5ff025ef
commit fcf9a2041a
33 changed files with 224 additions and 234 deletions

View File

@ -3,56 +3,55 @@ name: Build
on: [push, pull_request]
jobs:
lint:
runs-on: ubuntu-latest
strategy:
matrix:
python_version: [3.9, '3.10', '3.11', '3.12', '3.13']
python_version: ["3.10", "3.11", "3.12", "3.13", "3.14"]
steps:
- uses: actions/checkout@v5
- name: Set up Python
uses: actions/setup-python@v6
with:
python-version: ${{ matrix.python_version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install hatch
- name: Lint and typecheck
run: |
hatch run lint-check
- uses: actions/checkout@v5
- name: Set up Python
uses: actions/setup-python@v6
with:
python-version: ${{ matrix.python_version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install hatch
- name: Lint and typecheck
run: |
hatch run lint-check
test:
runs-on: ubuntu-latest
strategy:
matrix:
python_version: [3.9, '3.10', '3.11', '3.12', '3.13']
python_version: ["3.10", "3.11", "3.12", "3.13", "3.14"]
steps:
- uses: actions/checkout@v5
- name: Set up Python
uses: actions/setup-python@v6
with:
python-version: ${{ matrix.python_version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install hatch
- name: Test
run: |
hatch run test:test-cov-xml
- uses: codecov/codecov-action@v5
with:
token: ${{ secrets.CODECOV_TOKEN }}
fail_ci_if_error: true
verbose: true
- name: Build and install it on system host
run: |
hatch build
pip install dist/fastapi_users-*.whl
python test_build.py
- uses: actions/checkout@v5
- name: Set up Python
uses: actions/setup-python@v6
with:
python-version: ${{ matrix.python_version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install hatch
- name: Test
run: |
hatch run test:test-cov-xml
- uses: codecov/codecov-action@v5
with:
token: ${{ secrets.CODECOV_TOKEN }}
fail_ci_if_error: true
verbose: true
- name: Build and install it on system host
run: |
hatch build
pip install dist/fastapi_users-*.whl
python test_build.py
release:
runs-on: ubuntu-latest
@ -60,27 +59,27 @@ jobs:
if: startsWith(github.ref, 'refs/tags/')
steps:
- uses: actions/checkout@v5
- name: Set up Python
uses: actions/setup-python@v6
with:
python-version: 3.9
- name: Install dependencies
shell: bash
run: |
python -m pip install --upgrade pip
pip install hatch
- name: Build and publish on PyPI
env:
HATCH_INDEX_USER: ${{ secrets.HATCH_INDEX_USER }}
HATCH_INDEX_AUTH: ${{ secrets.HATCH_INDEX_AUTH }}
run: |
hatch build
hatch publish
- name: Create release
uses: ncipollo/release-action@v1
with:
draft: true
body: ${{ github.event.head_commit.message }}
artifacts: dist/*.whl,dist/*.tar.gz
token: ${{ secrets.GITHUB_TOKEN }}
- uses: actions/checkout@v5
- name: Set up Python
uses: actions/setup-python@v6
with:
python-version: 3.10
- name: Install dependencies
shell: bash
run: |
python -m pip install --upgrade pip
pip install hatch
- name: Build and publish on PyPI
env:
HATCH_INDEX_USER: ${{ secrets.HATCH_INDEX_USER }}
HATCH_INDEX_AUTH: ${{ secrets.HATCH_INDEX_AUTH }}
run: |
hatch build
hatch publish
- name: Create release
uses: ncipollo/release-action@v1
with:
draft: true
body: ${{ github.event.head_commit.message }}
artifacts: dist/*.whl,dist/*.tar.gz
token: ${{ secrets.GITHUB_TOKEN }}

View File

@ -16,7 +16,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v6
with:
python-version: 3.9
python-version: 3.10
- name: Install dependencies
shell: bash
run: |

View File

@ -1,5 +1,4 @@
import uuid
from typing import Optional
from fastapi import Depends, Request
from fastapi_users import BaseUserManager, UUIDIDMixin
@ -13,16 +12,16 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
reset_password_token_secret = SECRET
verification_token_secret = SECRET
async def on_after_register(self, user: User, request: Optional[Request] = None):
async def on_after_register(self, user: User, request: Request | None = None):
print(f"User {user.id} has registered.")
async def on_after_forgot_password(
self, user: User, token: str, request: Optional[Request] = None
self, user: User, token: str, request: Request | None = None
):
print(f"User {user.id} has forgot their password. Reset token: {token}")
async def on_after_request_verify(
self, user: User, token: str, request: Optional[Request] = None
self, user: User, token: str, request: Request | None = None
):
print(f"Verification requested for user {user.id}. Verification token: {token}")

View File

@ -1,5 +1,4 @@
import os
from typing import Optional
from beanie import PydanticObjectId
from fastapi import Depends, Request
@ -26,16 +25,16 @@ class UserManager(ObjectIDIDMixin, BaseUserManager[User, PydanticObjectId]):
reset_password_token_secret = SECRET
verification_token_secret = SECRET
async def on_after_register(self, user: User, request: Optional[Request] = None):
async def on_after_register(self, user: User, request: Request | None = None):
print(f"User {user.id} has registered.")
async def on_after_forgot_password(
self, user: User, token: str, request: Optional[Request] = None
self, user: User, token: str, request: Request | None = None
):
print(f"User {user.id} has forgot their password. Reset token: {token}")
async def on_after_request_verify(
self, user: User, token: str, request: Optional[Request] = None
self, user: User, token: str, request: Request | None = None
):
print(f"Verification requested for user {user.id}. Verification token: {token}")

View File

@ -1,5 +1,3 @@
from typing import Optional
from beanie import PydanticObjectId
from fastapi import Depends, Request
from fastapi_users import BaseUserManager, FastAPIUsers
@ -19,16 +17,16 @@ class UserManager(ObjectIDIDMixin, BaseUserManager[User, PydanticObjectId]):
reset_password_token_secret = SECRET
verification_token_secret = SECRET
async def on_after_register(self, user: User, request: Optional[Request] = None):
async def on_after_register(self, user: User, request: Request | None = None):
print(f"User {user.id} has registered.")
async def on_after_forgot_password(
self, user: User, token: str, request: Optional[Request] = None
self, user: User, token: str, request: Request | None = None
):
print(f"User {user.id} has forgot their password. Reset token: {token}")
async def on_after_request_verify(
self, user: User, token: str, request: Optional[Request] = None
self, user: User, token: str, request: Request | None = None
):
print(f"Verification requested for user {user.id}. Verification token: {token}")

View File

@ -1,6 +1,5 @@
import os
import uuid
from typing import Optional
from fastapi import Depends, Request
from fastapi_users import BaseUserManager, FastAPIUsers, UUIDIDMixin, models
@ -26,16 +25,16 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
reset_password_token_secret = SECRET
verification_token_secret = SECRET
async def on_after_register(self, user: User, request: Optional[Request] = None):
async def on_after_register(self, user: User, request: Request | None = None):
print(f"User {user.id} has registered.")
async def on_after_forgot_password(
self, user: User, token: str, request: Optional[Request] = None
self, user: User, token: str, request: Request | None = None
):
print(f"User {user.id} has forgot their password. Reset token: {token}")
async def on_after_request_verify(
self, user: User, token: str, request: Optional[Request] = None
self, user: User, token: str, request: Request | None = None
):
print(f"Verification requested for user {user.id}. Verification token: {token}")

View File

@ -1,5 +1,4 @@
import uuid
from typing import Optional
from fastapi import Depends, Request
from fastapi_users import BaseUserManager, FastAPIUsers, UUIDIDMixin, models
@ -19,16 +18,16 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
reset_password_token_secret = SECRET
verification_token_secret = SECRET
async def on_after_register(self, user: User, request: Optional[Request] = None):
async def on_after_register(self, user: User, request: Request | None = None):
print(f"User {user.id} has registered.")
async def on_after_forgot_password(
self, user: User, token: str, request: Optional[Request] = None
self, user: User, token: str, request: Request | None = None
):
print(f"User {user.id} has forgot their password. Reset token: {token}")
async def on_after_request_verify(
self, user: User, token: str, request: Optional[Request] = None
self, user: User, token: str, request: Request | None = None
):
print(f"Verification requested for user {user.id}. Verification token: {token}")

View File

@ -1,7 +1,7 @@
import re
from collections.abc import Sequence
from collections.abc import Callable, Sequence
from inspect import Parameter, Signature
from typing import Any, Callable, Generic, Optional, cast
from typing import Any, Generic, cast
from fastapi import Depends, HTTPException, status
from makefun import with_signature
@ -65,9 +65,8 @@ class Authenticator(Generic[models.UP, models.ID]):
active: bool = False,
verified: bool = False,
superuser: bool = False,
get_enabled_backends: Optional[
EnabledBackendsDependency[models.UP, models.ID]
] = None,
get_enabled_backends: EnabledBackendsDependency[models.UP, models.ID]
| None = None,
):
"""
Return a dependency callable to retrieve currently authenticated user and token.
@ -111,9 +110,8 @@ class Authenticator(Generic[models.UP, models.ID]):
active: bool = False,
verified: bool = False,
superuser: bool = False,
get_enabled_backends: Optional[
EnabledBackendsDependency[models.UP, models.ID]
] = None,
get_enabled_backends: EnabledBackendsDependency[models.UP, models.ID]
| None = None,
):
"""
Return a dependency callable to retrieve currently authenticated user.
@ -161,9 +159,9 @@ class Authenticator(Generic[models.UP, models.ID]):
verified: bool = False,
superuser: bool = False,
**kwargs,
) -> tuple[Optional[models.UP], Optional[str]]:
user: Optional[models.UP] = None
token: Optional[str] = None
) -> tuple[models.UP | None, str | None]:
user: models.UP | None = None
token: str | None = None
enabled_backends: Sequence[AuthenticationBackend[models.UP, models.ID]] = (
kwargs.get("enabled_backends", self.backends)
)
@ -193,7 +191,7 @@ class Authenticator(Generic[models.UP, models.ID]):
return user, token
def _get_dependency_signature(
self, get_enabled_backends: Optional[EnabledBackendsDependency] = None
self, get_enabled_backends: EnabledBackendsDependency | None = None
) -> Signature:
"""
Generate a dynamic signature for the current_user dependency.

View File

@ -1,4 +1,4 @@
from typing import Generic, Optional, Protocol
from typing import Generic, Protocol
from fastapi_users import models
from fastapi_users.manager import BaseUserManager
@ -10,8 +10,8 @@ class StrategyDestroyNotSupportedError(Exception):
class Strategy(Protocol, Generic[models.UP, models.ID]):
async def read_token(
self, token: Optional[str], user_manager: BaseUserManager[models.UP, models.ID]
) -> Optional[models.UP]: ... # pragma: no cover
self, token: str | None, user_manager: BaseUserManager[models.UP, models.ID]
) -> models.UP | None: ... # pragma: no cover
async def write_token(self, user: models.UP) -> str: ... # pragma: no cover

View File

@ -1,5 +1,5 @@
from datetime import datetime
from typing import Any, Generic, Optional, Protocol
from typing import Any, Generic, Protocol
from fastapi_users.authentication.strategy.db.models import AP
@ -8,8 +8,8 @@ class AccessTokenDatabase(Protocol, Generic[AP]):
"""Protocol for retrieving, creating and updating access tokens from a database."""
async def get_by_token(
self, token: str, max_age: Optional[datetime] = None
) -> Optional[AP]:
self, token: str, max_age: datetime | None = None
) -> AP | None:
"""Get a single access token by token."""
... # pragma: no cover

View File

@ -1,6 +1,6 @@
import secrets
from datetime import datetime, timedelta, timezone
from typing import Any, Generic, Optional
from typing import Any, Generic
from fastapi_users import exceptions, models
from fastapi_users.authentication.strategy.base import Strategy
@ -13,14 +13,14 @@ class DatabaseStrategy(
Strategy[models.UP, models.ID], Generic[models.UP, models.ID, AP]
):
def __init__(
self, database: AccessTokenDatabase[AP], lifetime_seconds: Optional[int] = None
self, database: AccessTokenDatabase[AP], lifetime_seconds: int | None = None
):
self.database = database
self.lifetime_seconds = lifetime_seconds
async def read_token(
self, token: Optional[str], user_manager: BaseUserManager[models.UP, models.ID]
) -> Optional[models.UP]:
self, token: str | None, user_manager: BaseUserManager[models.UP, models.ID]
) -> models.UP | None:
if token is None:
return None

View File

@ -1,4 +1,4 @@
from typing import Generic, Optional
from typing import Generic
import jwt
@ -21,10 +21,10 @@ class JWTStrategy(Strategy[models.UP, models.ID], Generic[models.UP, models.ID])
def __init__(
self,
secret: SecretType,
lifetime_seconds: Optional[int],
lifetime_seconds: int | None,
token_audience: list[str] = ["fastapi-users:auth"],
algorithm: str = "HS256",
public_key: Optional[SecretType] = None,
public_key: SecretType | None = None,
):
self.secret = secret
self.lifetime_seconds = lifetime_seconds
@ -41,8 +41,8 @@ class JWTStrategy(Strategy[models.UP, models.ID], Generic[models.UP, models.ID])
return self.public_key or self.secret
async def read_token(
self, token: Optional[str], user_manager: BaseUserManager[models.UP, models.ID]
) -> Optional[models.UP]:
self, token: str | None, user_manager: BaseUserManager[models.UP, models.ID]
) -> models.UP | None:
if token is None:
return None

View File

@ -1,5 +1,5 @@
import secrets
from typing import Generic, Optional
from typing import Generic
import redis.asyncio
@ -12,7 +12,7 @@ class RedisStrategy(Strategy[models.UP, models.ID], Generic[models.UP, models.ID
def __init__(
self,
redis: redis.asyncio.Redis,
lifetime_seconds: Optional[int] = None,
lifetime_seconds: int | None = None,
*,
key_prefix: str = "fastapi_users_token:",
):
@ -21,8 +21,8 @@ class RedisStrategy(Strategy[models.UP, models.ID], Generic[models.UP, models.ID
self.key_prefix = key_prefix
async def read_token(
self, token: Optional[str], user_manager: BaseUserManager[models.UP, models.ID]
) -> Optional[models.UP]:
self, token: str | None, user_manager: BaseUserManager[models.UP, models.ID]
) -> models.UP | None:
if token is None:
return None

View File

@ -1,4 +1,4 @@
from typing import Literal, Optional
from typing import Literal
from fastapi import Response, status
from fastapi.security import APIKeyCookie
@ -13,9 +13,9 @@ class CookieTransport(Transport):
def __init__(
self,
cookie_name: str = "fastapiusersauth",
cookie_max_age: Optional[int] = None,
cookie_max_age: int | None = None,
cookie_path: str = "/",
cookie_domain: Optional[str] = None,
cookie_domain: str | None = None,
cookie_secure: bool = True,
cookie_httponly: bool = True,
cookie_samesite: Literal["lax", "strict", "none"] = "lax",

View File

@ -1,4 +1,4 @@
from typing import Any, Generic, Optional
from typing import Any, Generic
from fastapi_users.models import ID, OAP, UOAP, UP
from fastapi_users.types import DependencyCallable
@ -7,15 +7,15 @@ from fastapi_users.types import DependencyCallable
class BaseUserDatabase(Generic[UP, ID]):
"""Base adapter for retrieving, creating and updating users from a database."""
async def get(self, id: ID) -> Optional[UP]:
async def get(self, id: ID) -> UP | None:
"""Get a single user by id."""
raise NotImplementedError()
async def get_by_email(self, email: str) -> Optional[UP]:
async def get_by_email(self, email: str) -> UP | None:
"""Get a single user by email."""
raise NotImplementedError()
async def get_by_oauth_account(self, oauth: str, account_id: str) -> Optional[UP]:
async def get_by_oauth_account(self, oauth: str, account_id: str) -> UP | None:
"""Get a single user by OAuth account id."""
raise NotImplementedError()

View File

@ -1,5 +1,5 @@
from collections.abc import Sequence
from typing import Generic, Optional
from typing import Generic
from fastapi import APIRouter
@ -96,7 +96,7 @@ class FastAPIUsers(Generic[models.UP, models.ID]):
oauth_client: BaseOAuth2,
backend: AuthenticationBackend[models.UP, models.ID],
state_secret: SecretType,
redirect_url: Optional[str] = None,
redirect_url: str | None = None,
associate_by_email: bool = False,
is_verified_by_default: bool = False,
) -> APIRouter:
@ -129,7 +129,7 @@ class FastAPIUsers(Generic[models.UP, models.ID]):
oauth_client: BaseOAuth2,
user_schema: type[schemas.U],
state_secret: SecretType,
redirect_url: Optional[str] = None,
redirect_url: str | None = None,
requires_verification: bool = False,
) -> APIRouter:
"""

View File

@ -1,10 +1,10 @@
from datetime import datetime, timedelta, timezone
from typing import Any, Optional, Union
from typing import Any
import jwt
from pydantic import SecretStr
SecretType = Union[str, SecretStr]
SecretType = str | SecretStr
JWT_ALGORITHM = "HS256"
@ -17,7 +17,7 @@ def _get_secret_value(secret: SecretType) -> str:
def generate_jwt(
data: dict,
secret: SecretType,
lifetime_seconds: Optional[int] = None,
lifetime_seconds: int | None = None,
algorithm: str = JWT_ALGORITHM,
) -> str:
payload = data.copy()

View File

@ -1,5 +1,5 @@
import uuid
from typing import Any, Generic, Optional, Union
from typing import Any, Generic
import jwt
from fastapi import Request, Response
@ -43,7 +43,7 @@ class BaseUserManager(Generic[models.UP, models.ID]):
def __init__(
self,
user_db: BaseUserDatabase[models.UP, models.ID],
password_helper: Optional[PasswordHelperProtocol] = None,
password_helper: PasswordHelperProtocol | None = None,
):
self.user_db = user_db
if password_helper is None:
@ -111,7 +111,7 @@ class BaseUserManager(Generic[models.UP, models.ID]):
self,
user_create: schemas.UC,
safe: bool = False,
request: Optional[Request] = None,
request: Request | None = None,
) -> models.UP:
"""
Create a user in database.
@ -152,9 +152,9 @@ class BaseUserManager(Generic[models.UP, models.ID]):
access_token: str,
account_id: str,
account_email: str,
expires_at: Optional[int] = None,
refresh_token: Optional[str] = None,
request: Optional[Request] = None,
expires_at: int | None = None,
refresh_token: str | None = None,
request: Request | None = None,
*,
associate_by_email: bool = False,
is_verified_by_default: bool = False,
@ -237,9 +237,9 @@ class BaseUserManager(Generic[models.UP, models.ID]):
access_token: str,
account_id: str,
account_email: str,
expires_at: Optional[int] = None,
refresh_token: Optional[str] = None,
request: Optional[Request] = None,
expires_at: int | None = None,
refresh_token: str | None = None,
request: Request | None = None,
) -> models.UOAP:
"""
Handle the callback after a successful OAuth association.
@ -273,7 +273,7 @@ class BaseUserManager(Generic[models.UP, models.ID]):
return user
async def request_verify(
self, user: models.UP, request: Optional[Request] = None
self, user: models.UP, request: Request | None = None
) -> None:
"""
Start a verification request.
@ -303,7 +303,7 @@ class BaseUserManager(Generic[models.UP, models.ID]):
)
await self.on_after_request_verify(user, token, request)
async def verify(self, token: str, request: Optional[Request] = None) -> models.UP:
async def verify(self, token: str, request: Request | None = None) -> models.UP:
"""
Validate a verification request.
@ -356,7 +356,7 @@ class BaseUserManager(Generic[models.UP, models.ID]):
return verified_user
async def forgot_password(
self, user: models.UP, request: Optional[Request] = None
self, user: models.UP, request: Request | None = None
) -> None:
"""
Start a forgot password request.
@ -384,7 +384,7 @@ class BaseUserManager(Generic[models.UP, models.ID]):
await self.on_after_forgot_password(user, token, request)
async def reset_password(
self, token: str, password: str, request: Optional[Request] = None
self, token: str, password: str, request: Request | None = None
) -> models.UP:
"""
Reset the password of a user.
@ -442,7 +442,7 @@ class BaseUserManager(Generic[models.UP, models.ID]):
user_update: schemas.UU,
user: models.UP,
safe: bool = False,
request: Optional[Request] = None,
request: Request | None = None,
) -> models.UP:
"""
Update a user.
@ -469,7 +469,7 @@ class BaseUserManager(Generic[models.UP, models.ID]):
async def delete(
self,
user: models.UP,
request: Optional[Request] = None,
request: Request | None = None,
) -> None:
"""
Delete a user.
@ -483,7 +483,7 @@ class BaseUserManager(Generic[models.UP, models.ID]):
await self.on_after_delete(user, request)
async def validate_password(
self, password: str, user: Union[schemas.UC, models.UP]
self, password: str, user: schemas.UC | models.UP
) -> None:
"""
Validate a password.
@ -498,7 +498,7 @@ class BaseUserManager(Generic[models.UP, models.ID]):
return # pragma: no cover
async def on_after_register(
self, user: models.UP, request: Optional[Request] = None
self, user: models.UP, request: Request | None = None
) -> None:
"""
Perform logic after successful user registration.
@ -515,7 +515,7 @@ class BaseUserManager(Generic[models.UP, models.ID]):
self,
user: models.UP,
update_dict: dict[str, Any],
request: Optional[Request] = None,
request: Request | None = None,
) -> None:
"""
Perform logic after successful user update.
@ -530,7 +530,7 @@ class BaseUserManager(Generic[models.UP, models.ID]):
return # pragma: no cover
async def on_after_request_verify(
self, user: models.UP, token: str, request: Optional[Request] = None
self, user: models.UP, token: str, request: Request | None = None
) -> None:
"""
Perform logic after successful verification request.
@ -545,7 +545,7 @@ class BaseUserManager(Generic[models.UP, models.ID]):
return # pragma: no cover
async def on_after_verify(
self, user: models.UP, request: Optional[Request] = None
self, user: models.UP, request: Request | None = None
) -> None:
"""
Perform logic after successful user verification.
@ -559,7 +559,7 @@ class BaseUserManager(Generic[models.UP, models.ID]):
return # pragma: no cover
async def on_after_forgot_password(
self, user: models.UP, token: str, request: Optional[Request] = None
self, user: models.UP, token: str, request: Request | None = None
) -> None:
"""
Perform logic after successful forgot password request.
@ -574,7 +574,7 @@ class BaseUserManager(Generic[models.UP, models.ID]):
return # pragma: no cover
async def on_after_reset_password(
self, user: models.UP, request: Optional[Request] = None
self, user: models.UP, request: Request | None = None
) -> None:
"""
Perform logic after successful password reset.
@ -590,8 +590,8 @@ class BaseUserManager(Generic[models.UP, models.ID]):
async def on_after_login(
self,
user: models.UP,
request: Optional[Request] = None,
response: Optional[Response] = None,
request: Request | None = None,
response: Response | None = None,
) -> None:
"""
Perform logic after user login.
@ -606,7 +606,7 @@ class BaseUserManager(Generic[models.UP, models.ID]):
return # pragma: no cover
async def on_before_delete(
self, user: models.UP, request: Optional[Request] = None
self, user: models.UP, request: Request | None = None
) -> None:
"""
Perform logic before user delete.
@ -620,7 +620,7 @@ class BaseUserManager(Generic[models.UP, models.ID]):
return # pragma: no cover
async def on_after_delete(
self, user: models.UP, request: Optional[Request] = None
self, user: models.UP, request: Request | None = None
) -> None:
"""
Perform logic before user delete.
@ -635,7 +635,7 @@ class BaseUserManager(Generic[models.UP, models.ID]):
async def authenticate(
self, credentials: OAuth2PasswordRequestForm
) -> Optional[models.UP]:
) -> models.UP | None:
"""
Authenticate and return a user following an email and a password.

View File

@ -1,4 +1,4 @@
from typing import Generic, Optional, Protocol, TypeVar
from typing import Generic, Protocol, TypeVar
ID = TypeVar("ID")
@ -20,8 +20,8 @@ class OAuthAccountProtocol(Protocol[ID]):
id: ID
oauth_name: str
access_token: str
expires_at: Optional[int]
refresh_token: Optional[str]
expires_at: int | None
refresh_token: str | None
account_id: str
account_email: str

View File

@ -1,3 +1,3 @@
from typing import Any, Union
from typing import Any
OpenAPIResponseType = dict[Union[int, str], dict[str, Any]]
OpenAPIResponseType = dict[int | str, dict[str, Any]]

View File

@ -1,5 +1,5 @@
import secrets
from typing import Optional, Protocol, Union
from typing import Protocol
from pwdlib import PasswordHash
from pwdlib.hashers.argon2 import Argon2Hasher
@ -9,7 +9,7 @@ from pwdlib.hashers.bcrypt import BcryptHasher
class PasswordHelperProtocol(Protocol):
def verify_and_update(
self, plain_password: str, hashed_password: str
) -> tuple[bool, Union[str, None]]: ... # pragma: no cover
) -> tuple[bool, str | None]: ... # pragma: no cover
def hash(self, password: str) -> str: ... # pragma: no cover
@ -17,7 +17,7 @@ class PasswordHelperProtocol(Protocol):
class PasswordHelper(PasswordHelperProtocol):
def __init__(self, password_hash: Optional[PasswordHash] = None) -> None:
def __init__(self, password_hash: PasswordHash | None = None) -> None:
if password_hash is None:
self.password_hash = PasswordHash(
(
@ -30,7 +30,7 @@ class PasswordHelper(PasswordHelperProtocol):
def verify_and_update(
self, plain_password: str, hashed_password: str
) -> tuple[bool, Union[str, None]]:
) -> tuple[bool, str | None]:
return self.password_hash.verify_and_update(plain_password, hashed_password)
def hash(self, password: str) -> str:

View File

@ -1,11 +1,10 @@
from enum import Enum
from typing import Union
from pydantic import BaseModel
class ErrorModel(BaseModel):
detail: Union[str, dict[str, str]]
detail: str | dict[str, str]
class ErrorCodeReasonModel(BaseModel):

View File

@ -1,5 +1,3 @@
from typing import Optional
import jwt
from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
from httpx_oauth.integrations.fastapi import OAuth2AuthorizeCallback
@ -32,7 +30,7 @@ def get_oauth_router(
backend: AuthenticationBackend[models.UP, models.ID],
get_user_manager: UserManagerDependency[models.UP, models.ID],
state_secret: SecretType,
redirect_url: Optional[str] = None,
redirect_url: str | None = None,
associate_by_email: bool = False,
is_verified_by_default: bool = False,
) -> APIRouter:
@ -160,7 +158,7 @@ def get_oauth_associate_router(
get_user_manager: UserManagerDependency[models.UP, models.ID],
user_schema: type[schemas.U],
state_secret: SecretType,
redirect_url: Optional[str] = None,
redirect_url: str | None = None,
requires_verification: bool = False,
) -> APIRouter:
"""Generate a router with the OAuth routes to associate an authenticated user."""

View File

@ -1,4 +1,4 @@
from typing import Any, Generic, Optional, TypeVar
from typing import Any, Generic, TypeVar
from pydantic import BaseModel, ConfigDict, EmailStr
from pydantic.version import VERSION as PYDANTIC_VERSION
@ -64,17 +64,17 @@ class BaseUser(CreateUpdateDictModel, Generic[models.ID]):
class BaseUserCreate(CreateUpdateDictModel):
email: EmailStr
password: str
is_active: Optional[bool] = True
is_superuser: Optional[bool] = False
is_verified: Optional[bool] = False
is_active: bool | None = True
is_superuser: bool | None = False
is_verified: bool | None = False
class BaseUserUpdate(CreateUpdateDictModel):
password: Optional[str] = None
email: Optional[EmailStr] = None
is_active: Optional[bool] = None
is_superuser: Optional[bool] = None
is_verified: Optional[bool] = None
password: str | None = None
email: EmailStr | None = None
is_active: bool | None = None
is_superuser: bool | None = None
is_verified: bool | None = None
U = TypeVar("U", bound=BaseUser)
@ -88,8 +88,8 @@ class BaseOAuthAccount(BaseModel, Generic[models.ID]):
id: models.ID
oauth_name: str
access_token: str
expires_at: Optional[int] = None
refresh_token: Optional[str] = None
expires_at: int | None = None
refresh_token: str | None = None
account_id: str
account_email: str

View File

@ -1,15 +1,19 @@
from collections.abc import AsyncGenerator, AsyncIterator, Coroutine, Generator
from typing import Callable, TypeVar, Union
from collections.abc import (
AsyncGenerator,
AsyncIterator,
Callable,
Coroutine,
Generator,
)
from typing import TypeVar
RETURN_TYPE = TypeVar("RETURN_TYPE")
DependencyCallable = Callable[
...,
Union[
RETURN_TYPE,
Coroutine[None, None, RETURN_TYPE],
AsyncGenerator[RETURN_TYPE, None],
Generator[RETURN_TYPE, None, None],
AsyncIterator[RETURN_TYPE],
],
RETURN_TYPE
| Coroutine[None, None, RETURN_TYPE]
| AsyncGenerator[RETURN_TYPE, None]
| Generator[RETURN_TYPE, None, None]
| AsyncIterator[RETURN_TYPE],
]

View File

@ -33,7 +33,7 @@ markers = [
]
[tool.ruff]
target-version = "py39"
target-version = "py310"
[tool.ruff.lint]
extend-select = ["UP", "TRY"]
@ -133,15 +133,15 @@ classifiers = [
"Framework :: FastAPI",
"Framework :: AsyncIO",
"Intended Audience :: Developers",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"Programming Language :: Python :: 3.14",
"Programming Language :: Python :: 3 :: Only",
"Topic :: Internet :: WWW/HTTP :: Session",
]
requires-python = ">=3.9"
requires-python = ">=3.10"
dependencies = [
"fastapi >=0.65.2",
"pwdlib[argon2,bcrypt] ==0.2.1",

View File

@ -1,8 +1,8 @@
import asyncio
import dataclasses
import uuid
from collections.abc import AsyncGenerator
from typing import Any, Callable, Generic, Optional, Union
from collections.abc import AsyncGenerator, Callable
from typing import Any, Generic
from unittest.mock import MagicMock
import httpx
@ -41,7 +41,7 @@ class UserModel(models.UserProtocol[IDType]):
is_active: bool = True
is_superuser: bool = False
is_verified: bool = False
first_name: Optional[str] = None
first_name: str | None = None
@dataclasses.dataclass
@ -51,8 +51,8 @@ class OAuthAccountModel(models.OAuthAccountProtocol[IDType]):
account_id: str
account_email: str
id: IDType = dataclasses.field(default_factory=uuid.uuid4)
expires_at: Optional[int] = None
refresh_token: Optional[str] = None
expires_at: int | None = None
refresh_token: str | None = None
@dataclasses.dataclass
@ -61,15 +61,15 @@ class UserOAuthModel(UserModel):
class User(schemas.BaseUser[IDType]):
first_name: Optional[str] = None
first_name: str | None = None
class UserCreate(schemas.BaseUserCreate):
first_name: Optional[str] = None
first_name: str | None = None
class UserUpdate(schemas.BaseUserUpdate):
first_name: Optional[str] = None
first_name: str | None = None
class UserOAuth(User, schemas.BaseOAuthAccountMixin):
@ -83,7 +83,7 @@ class BaseTestUserManager(
verification_token_secret = "SECRET"
async def validate_password(
self, password: str, user: Union[schemas.UC, models.UP]
self, password: str, user: schemas.UC | models.UP
) -> None:
if len(password) < 3:
raise exceptions.InvalidPasswordException(
@ -308,7 +308,7 @@ def mock_user_db(
verified_superuser: UserModel,
) -> BaseUserDatabase[UserModel, IDType]:
class MockUserDatabase(BaseUserDatabase[UserModel, IDType]):
async def get(self, id: UUID4) -> Optional[UserModel]:
async def get(self, id: UUID4) -> UserModel | None:
if id == user.id:
return user
if id == verified_user.id:
@ -321,7 +321,7 @@ def mock_user_db(
return verified_superuser
return None
async def get_by_email(self, email: str) -> Optional[UserModel]:
async def get_by_email(self, email: str) -> UserModel | None:
lower_email = email.lower()
if lower_email == user.email.lower():
return user
@ -360,7 +360,7 @@ def mock_user_db_oauth(
verified_superuser_oauth: UserOAuthModel,
) -> BaseUserDatabase[UserOAuthModel, IDType]:
class MockUserDatabase(BaseUserDatabase[UserOAuthModel, IDType]):
async def get(self, id: UUID4) -> Optional[UserOAuthModel]:
async def get(self, id: UUID4) -> UserOAuthModel | None:
if id == user_oauth.id:
return user_oauth
if id == verified_user_oauth.id:
@ -373,7 +373,7 @@ def mock_user_db_oauth(
return verified_superuser_oauth
return None
async def get_by_email(self, email: str) -> Optional[UserOAuthModel]:
async def get_by_email(self, email: str) -> UserOAuthModel | None:
lower_email = email.lower()
if lower_email == user_oauth.email.lower():
return user_oauth
@ -389,7 +389,7 @@ def mock_user_db_oauth(
async def get_by_oauth_account(
self, oauth: str, account_id: str
) -> Optional[UserOAuthModel]:
) -> UserOAuthModel | None:
user_oauth_account = user_oauth.oauth_accounts[0]
if (
user_oauth_account.oauth_name == oauth
@ -511,8 +511,8 @@ class MockTransport(BearerTransport):
class MockStrategy(Strategy[UserModel, IDType]):
async def read_token(
self, token: Optional[str], user_manager: BaseUserManager[UserModel, IDType]
) -> Optional[UserModel]:
self, token: str | None, user_manager: BaseUserManager[UserModel, IDType]
) -> UserModel | None:
if token is not None:
try:
parsed_id = user_manager.parse_id(token)

View File

@ -1,5 +1,5 @@
from collections.abc import AsyncGenerator, Sequence
from typing import Generic, Optional
from typing import Generic
import httpx
import pytest
@ -18,7 +18,7 @@ from tests.conftest import User, UserModel
class MockSecurityScheme(SecurityBase):
def __call__(self, request: Request) -> Optional[str]:
def __call__(self, request: Request) -> str | None:
return "mock"
@ -31,8 +31,8 @@ class MockTransport(Transport):
class NoneStrategy(Strategy):
async def read_token(
self, token: Optional[str], user_manager: BaseUserManager[models.UP, models.ID]
) -> Optional[models.UP]:
self, token: str | None, user_manager: BaseUserManager[models.UP, models.ID]
) -> models.UP | None:
return None
@ -41,8 +41,8 @@ class UserStrategy(Strategy, Generic[models.UP]):
self.user = user
async def read_token(
self, token: Optional[str], user_manager: BaseUserManager[models.UP, models.ID]
) -> Optional[models.UP]:
self, token: str | None, user_manager: BaseUserManager[models.UP, models.ID]
) -> models.UP | None:
return self.user
@ -72,9 +72,8 @@ def get_backend_user(user: UserModel):
def get_test_auth_client(get_user_manager, get_test_client):
async def _get_test_auth_client(
backends: list[AuthenticationBackend],
get_enabled_backends: Optional[
DependencyCallable[Sequence[AuthenticationBackend]]
] = None,
get_enabled_backends: DependencyCallable[Sequence[AuthenticationBackend]]
| None = None,
) -> AsyncGenerator[httpx.AsyncClient, None]:
app = FastAPI()
authenticator = Authenticator(backends, get_user_manager)

View File

@ -1,4 +1,5 @@
from typing import Callable, Generic, Optional, cast
from collections.abc import Callable
from typing import Generic, cast
import pytest
from fastapi import Response
@ -21,8 +22,8 @@ class MockTransportLogoutNotSupported(BearerTransport):
class MockStrategyDestroyNotSupported(Strategy, Generic[models.UP]):
async def read_token(
self, token: Optional[str], user_manager: BaseUserManager[models.UP, models.ID]
) -> Optional[models.UP]:
self, token: str | None, user_manager: BaseUserManager[models.UP, models.ID]
) -> models.UP | None:
return None
async def write_token(self, user: models.UP) -> str:

View File

@ -1,7 +1,7 @@
import dataclasses
import uuid
from datetime import datetime, timezone
from typing import Any, Optional
from typing import Any
import pytest
@ -30,8 +30,8 @@ class AccessTokenDatabaseMock(AccessTokenDatabase[AccessTokenModel]):
self.store = {}
async def get_by_token(
self, token: str, max_age: Optional[datetime] = None
) -> Optional[AccessTokenModel]:
self, token: str, max_age: datetime | None = None
) -> AccessTokenModel | None:
try:
access_token = self.store[token]
if max_age is not None and access_token.created_at < max_age:

View File

@ -1,5 +1,4 @@
from datetime import datetime
from typing import Optional
import pytest
@ -8,12 +7,12 @@ from tests.conftest import IDType, UserModel
class RedisMock:
store: dict[str, tuple[str, Optional[int]]]
store: dict[str, tuple[str, int | None]]
def __init__(self):
self.store = {}
async def get(self, key: str) -> Optional[str]:
async def get(self, key: str) -> str | None:
try:
value, expiration = self.store[key]
if expiration is not None and expiration < datetime.now().timestamp():
@ -23,7 +22,7 @@ class RedisMock:
else:
return value
async def set(self, key: str, value: str, ex: Optional[int] = None):
async def set(self, key: str, value: str, ex: int | None = None):
expiration = None
if ex is not None:
expiration = int(datetime.now().timestamp() + ex)

View File

@ -1,5 +1,4 @@
from collections.abc import AsyncGenerator
from typing import Optional
import httpx
import pytest
@ -76,13 +75,13 @@ async def test_app_client(
@app.get("/optional-current-user")
def optional_current_user(
user: Optional[UserModel] = Depends(fastapi_users.current_user(optional=True)),
user: UserModel | None = Depends(fastapi_users.current_user(optional=True)),
):
return schemas.model_validate(User, user) if user else None
@app.get("/optional-current-active-user")
def optional_current_active_user(
user: Optional[UserModel] = Depends(
user: UserModel | None = Depends(
fastapi_users.current_user(optional=True, active=True)
),
):
@ -90,7 +89,7 @@ async def test_app_client(
@app.get("/optional-current-verified-user")
def optional_current_verified_user(
user: Optional[UserModel] = Depends(
user: UserModel | None = Depends(
fastapi_users.current_user(optional=True, verified=True)
),
):
@ -98,7 +97,7 @@ async def test_app_client(
@app.get("/optional-current-superuser")
def optional_current_superuser(
user: Optional[UserModel] = Depends(
user: UserModel | None = Depends(
fastapi_users.current_user(optional=True, active=True, superuser=True)
),
):
@ -106,7 +105,7 @@ async def test_app_client(
@app.get("/optional-current-verified-superuser")
def optional_current_verified_superuser(
user: Optional[UserModel] = Depends(
user: UserModel | None = Depends(
fastapi_users.current_user(
optional=True, active=True, verified=True, superuser=True
)

View File

@ -1,5 +1,5 @@
import uuid
from typing import Callable
from collections.abc import Callable
import pytest
from fastapi.security import OAuth2PasswordRequestForm