Files
Wu Clan c53fa9fda8 Update and fix permissions logic (#129)
* Update and fix permissions logic

* feat: Update base route

* Exclude non-system routing record operation logs

* Update the parameter variable name

* Fix the jwt authorization verify

* Roles menu authorization is turned off by default

* Fix the operation log code field type

* Update the casbin routing string to config

* Fix JWT middleware

* Add custom msg of token error

* Add the character length of the operation log code field

* Update the logout interface authorization
2023-06-14 22:34:56 +08:00

58 lines
2.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Any
from fastapi import Request, Response
from starlette.authentication import AuthenticationBackend, AuthenticationError, AuthCredentials
from starlette.requests import HTTPConnection
from starlette.responses import JSONResponse
from backend.app.common import jwt
from backend.app.common.exception.errors import TokenError
from backend.app.common.log import log
from backend.app.database.db_mysql import async_db_session
class _AuthenticationError(AuthenticationError):
"""重写内部认证错误类"""
def __init__(self, *, code: int = None, msg: str = None, headers: dict[str, Any] | None = None):
self.code = code
self.msg = msg
self.headers = headers
class JwtAuthMiddleware(AuthenticationBackend):
"""JWT 认证中间件"""
@staticmethod
def auth_exception_handler(conn: HTTPConnection, exc: _AuthenticationError) -> Response:
"""覆盖内部认证错误处理"""
return JSONResponse(content={'code': exc.code, 'msg': exc.msg, 'data': None}, status_code=exc.code)
async def authenticate(self, request: Request):
auth = request.headers.get('Authorization')
if not auth:
return
scheme, token = auth.split()
if scheme.lower() != 'bearer':
return
try:
sub = await jwt.jwt_authentication(token)
async with async_db_session() as db:
user = await jwt.get_current_user(db, data=sub)
except TokenError as exc:
raise _AuthenticationError(code=exc.code, msg=exc.detail, headers=exc.headers)
except Exception as e:
log.exception(e)
raise _AuthenticationError(
code=getattr(e, 'code', 500),
msg=getattr(e, 'msg', 'Internal Server Error')
)
# 请注意,此返回使用非标准模式,所以在认证通过时,将丢失某些标准特性
# 标准返回模式请查看https://www.starlette.io/authentication/
return AuthCredentials(['authenticated']), user