Files
Wu Clan cd96d2f5cd Update dao and service instantiation styles (#276)
* Update dao and service instantiation styles

* Fix auth_service use
2024-01-22 10:22:13 +08:00

46 lines
1.6 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Annotated
from fastapi import APIRouter, Body, Depends, Path
from backend.app.common.jwt import DependsJwtAuth
from backend.app.common.permission import RequestPermission
from backend.app.common.rbac import DependsRBAC
from backend.app.common.response.response_code import CustomResponseCode
from backend.app.common.response.response_schema import ResponseModel, response_base
from backend.app.services.task_service import task_service
router = APIRouter()
@router.get('', summary='获取所有可执行任务模块', dependencies=[DependsJwtAuth])
async def get_all_tasks() -> ResponseModel:
tasks = task_service.get_task_list()
return await response_base.success(data=tasks)
@router.get('/{pk}', summary='获取任务结果', dependencies=[DependsJwtAuth])
async def get_task_result(pk: Annotated[str, Path(description='任务ID')]) -> ResponseModel:
task = task_service.get(pk)
if not task:
return await response_base.fail(res=CustomResponseCode.HTTP_204, data=pk)
return await response_base.success(data=task.result)
@router.post(
'/{module}',
summary='执行任务',
dependencies=[
Depends(RequestPermission('sys:task:run')),
DependsRBAC,
],
)
async def run_task(
module: Annotated[str, Path(description='任务模块')],
args: Annotated[list | None, Body()] = None,
kwargs: Annotated[dict | None, Body()] = None,
) -> ResponseModel:
task = task_service.run(module=module, args=args, kwargs=kwargs)
return await response_base.success(data=task.result)