#!/usr/bin/env python3 # -*- coding: utf-8 -*- from typing import Any from fastapi import Request, Response from fastapi.security.utils import get_authorization_scheme_param from starlette.authentication import AuthCredentials, AuthenticationBackend, AuthenticationError from starlette.requests import HTTPConnection from backend.app.admin.schema.user import GetUserInfoWithRelationDetail from backend.common.exception.errors import TokenError from backend.common.log import log from backend.common.security.jwt import jwt_authentication from backend.core.conf import settings from backend.utils.serializers import MsgSpecJSONResponse class _AuthenticationError(AuthenticationError): """重写内部认证错误类""" def __init__( self, *, code: int | None = None, msg: str | None = None, headers: dict[str, Any] | None = None ) -> None: """ 初始化认证错误 :param code: 错误码 :param msg: 错误信息 :param headers: 响应头 :return: """ self.code = code self.msg = msg self.headers = headers class JwtAuthMiddleware(AuthenticationBackend): """JWT 认证中间件""" @staticmethod def auth_exception_handler(conn: HTTPConnection, exc: _AuthenticationError) -> Response: """ 覆盖内部认证错误处理 :param conn: HTTP 连接对象 :param exc: 认证错误对象 :return: """ return MsgSpecJSONResponse(content={'code': exc.code, 'msg': exc.msg, 'data': None}, status_code=exc.code) async def authenticate(self, request: Request) -> tuple[AuthCredentials, GetUserInfoWithRelationDetail] | None: """ 认证请求 :param request: FastAPI 请求对象 :return: """ token = request.headers.get('Authorization') if not token: return None if request.url.path in settings.TOKEN_REQUEST_PATH_EXCLUDE: return None scheme, token = get_authorization_scheme_param(token) if scheme.lower() != 'bearer': return None try: user = await jwt_authentication(token) except TokenError as exc: raise _AuthenticationError(code=exc.code, msg=exc.detail, headers=exc.headers) except Exception as e: log.exception(f'JWT 授权异常:{e}') raise _AuthenticationError(code=getattr(e, 'code', 500), msg=getattr(e, 'msg', 'Internal Server Error')) # 请注意,此返回使用非标准模式,所以在认证通过时,将丢失某些标准特性 # 标准返回模式请查看:https://www.starlette.io/authentication/ return AuthCredentials(['authenticated']), user