mirror of
https://github.com/fastapi-admin/fastapi-admin.git
synced 2025-08-14 18:58:13 +08:00
new project
This commit is contained in:
@ -1,149 +1,50 @@
|
||||
import json
|
||||
from typing import List, Optional, Type
|
||||
|
||||
import jwt
|
||||
from fastapi import Depends, HTTPException, Path, Query
|
||||
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
|
||||
from fastapi.security.utils import get_authorization_scheme_param
|
||||
from pydantic import BaseModel
|
||||
from fastapi import Depends
|
||||
from fastapi.params import Path
|
||||
from starlette.requests import Request
|
||||
from starlette.status import HTTP_401_UNAUTHORIZED, HTTP_403_FORBIDDEN, HTTP_404_NOT_FOUND
|
||||
from tortoise import Tortoise
|
||||
|
||||
from . import enums
|
||||
from .factory import app
|
||||
from .models import AbstractUser
|
||||
|
||||
auth_schema = HTTPBearer()
|
||||
from fastapi_admin.exceptions import InvalidResource
|
||||
from fastapi_admin.resources import Dropdown, Link, Model, Resource
|
||||
|
||||
|
||||
async def jwt_required(
|
||||
request: Request, token: HTTPAuthorizationCredentials = Depends(auth_schema)
|
||||
):
|
||||
credentials_exception = HTTPException(HTTP_401_UNAUTHORIZED)
|
||||
try:
|
||||
payload = jwt.decode(token.credentials, app.admin_secret, algorithms=["HS256"])
|
||||
user_id = payload.get("user_id")
|
||||
if user_id is None:
|
||||
raise credentials_exception
|
||||
except jwt.PyJWTError:
|
||||
raise credentials_exception
|
||||
request.scope["user_id"] = user_id
|
||||
return user_id
|
||||
def get_model(resource: Optional[str] = Path(...)):
|
||||
if not resource:
|
||||
return
|
||||
for app, models in Tortoise.apps.items():
|
||||
model = models.get(resource.title())
|
||||
if model:
|
||||
return model
|
||||
|
||||
|
||||
async def jwt_optional(request: Request):
|
||||
authorization: str = request.headers.get("Authorization")
|
||||
scheme, credentials = get_authorization_scheme_param(authorization)
|
||||
if credentials:
|
||||
try:
|
||||
payload = jwt.decode(credentials, app.admin_secret, algorithms=["HS256"])
|
||||
user_id = payload.get("user_id")
|
||||
request.scope["user_id"] = user_id
|
||||
return user_id
|
||||
except jwt.PyJWTError:
|
||||
pass
|
||||
return
|
||||
def get_model_resource(request: Request, model=Depends(get_model)):
|
||||
return request.app.get_model_resource(model)
|
||||
|
||||
|
||||
class QueryItem(BaseModel):
|
||||
page: int = 1
|
||||
sort: dict
|
||||
where: dict = {}
|
||||
with_: dict = {}
|
||||
size: int = 10
|
||||
sort: dict = {}
|
||||
|
||||
class Config:
|
||||
fields = {"with_": "with"}
|
||||
def _get_resources(resources: List[Type[Resource]]):
|
||||
ret = []
|
||||
for resource in resources:
|
||||
item = {
|
||||
"icon": resource.icon,
|
||||
"label": resource.label,
|
||||
}
|
||||
if issubclass(resource, Link):
|
||||
item["type"] = "link"
|
||||
item["url"] = resource.url
|
||||
item["target"] = resource.target
|
||||
elif issubclass(resource, Model):
|
||||
item["type"] = "model"
|
||||
item["model"] = resource.model.__name__.lower()
|
||||
elif issubclass(resource, Dropdown):
|
||||
item["type"] = "dropdown"
|
||||
item["resources"] = _get_resources(resource.resources)
|
||||
else:
|
||||
raise InvalidResource("Should be subclass of Resource")
|
||||
ret.append(item)
|
||||
return ret
|
||||
|
||||
|
||||
def get_query(query=Query(...)):
|
||||
query = json.loads(query)
|
||||
return QueryItem.parse_obj(query)
|
||||
|
||||
|
||||
def get_model(resource: str = Path(...)):
|
||||
model = app.models.get(resource)
|
||||
return model
|
||||
|
||||
|
||||
async def parse_body(request: Request, resource: str = Path(...)):
|
||||
body = await request.json()
|
||||
resource = await app.get_resource(resource, exclude_pk=True, exclude_m2m_field=False)
|
||||
resource_fields = resource.resource_fields.keys()
|
||||
ret = {}
|
||||
for key in resource_fields:
|
||||
v = body.get(key)
|
||||
if v is not None:
|
||||
ret[key] = v
|
||||
return ret, resource_fields
|
||||
|
||||
|
||||
async def get_current_user(user_id=Depends(jwt_required)):
|
||||
user = await app.user_model.get_or_none(pk=user_id)
|
||||
if not user:
|
||||
raise HTTPException(HTTP_404_NOT_FOUND)
|
||||
return user
|
||||
|
||||
|
||||
class PermissionsChecker:
|
||||
def __init__(self, action: enums.PermissionAction):
|
||||
self.action = action
|
||||
|
||||
async def __call__(self, resource: str = Path(...), user=Depends(get_current_user)):
|
||||
if not app.permission or user.is_superuser:
|
||||
return
|
||||
if not user.is_active:
|
||||
raise HTTPException(status_code=HTTP_403_FORBIDDEN)
|
||||
has_permission = False
|
||||
await user.fetch_related("roles")
|
||||
for role in user.roles:
|
||||
if await role.permissions.filter(model=resource, action=self.action):
|
||||
has_permission = True
|
||||
break
|
||||
if not has_permission:
|
||||
raise HTTPException(status_code=HTTP_403_FORBIDDEN)
|
||||
|
||||
|
||||
read_checker = PermissionsChecker(action=enums.PermissionAction.read)
|
||||
create_checker = PermissionsChecker(action=enums.PermissionAction.create)
|
||||
update_checker = PermissionsChecker(action=enums.PermissionAction.update)
|
||||
delete_checker = PermissionsChecker(action=enums.PermissionAction.delete)
|
||||
|
||||
|
||||
class AdminLog:
|
||||
def __init__(self, action: str):
|
||||
self.action = action
|
||||
|
||||
async def __call__(
|
||||
self, request: Request, resource: str = Path(...), admin_id=Depends(jwt_required),
|
||||
):
|
||||
if app.admin_log:
|
||||
content = None
|
||||
if request.method in ["POST", "PUT"]:
|
||||
content = await request.json()
|
||||
elif request.method == "DELETE":
|
||||
content = {"pk": request.path_params.get("id")}
|
||||
if content:
|
||||
await app.admin_log_model.create(
|
||||
admin_id=admin_id, action=self.action, model=resource, content=content
|
||||
)
|
||||
|
||||
|
||||
admin_log_create = AdminLog(action="create")
|
||||
admin_log_update = AdminLog(action="update")
|
||||
admin_log_delete = AdminLog(action="delete")
|
||||
|
||||
|
||||
class HasPermission:
|
||||
def __init__(self, action: enums.PermissionAction):
|
||||
self.action = action
|
||||
|
||||
|
||||
async def has_resource_permission(
|
||||
action: enums.PermissionAction, resource: str, user: AbstractUser
|
||||
) -> bool:
|
||||
try:
|
||||
await PermissionsChecker(action=action)(resource, user)
|
||||
return True
|
||||
except HTTPException:
|
||||
return False
|
||||
def get_resources(request: Request) -> List[dict]:
|
||||
resources = request.app.resources
|
||||
return _get_resources(resources)
|
||||
|
Reference in New Issue
Block a user