Add support for celery dynamic tasks (#715)

* Add support for celery dynamic tasks

* Update the celery conf

* Update the celery task tables name

* Refactor the celery task-related interfaces

* Optimize auto-discovery tasks

* Remove redundant config

* Refine the business codes

* Optimize crontab validation returns

* Update dependencies in pyproject toml

* Fix some bugs

* Update dependencies

* Update the version to 1.7.0

* Fix update and delete event
This commit is contained in:
Wu Clan
2025-07-11 07:54:33 +08:00
committed by GitHub
parent e84ef04f15
commit ce3be1db8e
39 changed files with 2561 additions and 1263 deletions

View File

@ -24,7 +24,6 @@ OAUTH2_LINUX_DO_CLIENT_SECRET='test'
# App Task
# Celery
CELERY_BROKER_REDIS_DATABASE=1
CELERY_BACKEND_REDIS_DATABASE=2
# Rabbitmq
CELERY_RABBITMQ_HOST='127.0.0.1'
CELERY_RABBITMQ_PORT=5672

View File

@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-
from backend.utils.console import console
__version__ = '1.6.0'
__version__ = '1.7.0'
def get_version() -> str | None:

View File

@ -13,21 +13,16 @@ from sqlalchemy.ext.asyncio import async_engine_from_config
sys.path.append('../')
from backend.app import get_app_models
from backend.common.model import MappedBase
from backend.core import path_conf
from backend.database.db import SQLALCHEMY_DATABASE_URL
from backend.plugin.tools import get_plugin_models
# import your new model here
from backend.app.admin.model import * # noqa: F401
from backend.plugin.code_generator.model import * # noqa: F401
# import plugin model
for cls in get_plugin_models():
# import models
for cls in get_app_models() + get_plugin_models():
class_name = cls.__name__
if class_name in globals():
print(f'\nWarning: Class "{class_name}" already exists in global namespace.')
else:
if class_name not in globals():
globals()[class_name] = cls
if not os.path.exists(path_conf.ALEMBIC_VERSION_DIR):

View File

@ -1,2 +1,43 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import inspect
import os.path
from backend.common.log import log
from backend.core.path_conf import BASE_PATH
from backend.utils.import_parse import import_module_cached
def get_app_models():
"""获取 app 所有模型类"""
app_path = os.path.join(BASE_PATH, 'app')
list_dirs = os.listdir(app_path)
apps = []
for d in list_dirs:
if os.path.isdir(os.path.join(app_path, d)) and d != '__pycache__':
apps.append(d)
classes = []
for app in apps:
try:
module_path = f'backend.app.{app}.model'
module = import_module_cached(module_path)
except Exception as e:
log.warning(f'应用 {app} 中不包含 model 相关配置: {e}')
continue
for name, obj in inspect.getmembers(module):
if inspect.isclass(obj):
classes.append(obj)
return classes
# import all app models for auto create db tables
for cls in get_app_models():
class_name = cls.__name__
if class_name not in globals():
globals()[class_name] = cls

View File

@ -3,21 +3,21 @@
当前任务使用 Celery
实现,实施方案请查看 [#225](https://github.com/fastapi-practices/fastapi_best_architecture/discussions/225)
## 添加任务
## 定时任务
> [!IMPORTANT]
> 由于 Celery 任务扫描规则,使其对任务的目录结构要求及其严格,务必在 celery_task 目录下添加任务
`backend/app/task/tasks/beat.py` 文件内编写相关定时任务
### 简单任务
可以直接`tasks.py` 文件内编写相关任务代码
`backend/app/task/tasks/tasks.py` 文件内编写相关任务代码
### 层级任务
如果你想对任务进行目录层级划分,使任务结构更加清晰,你可以新建任意目录,但必须注意的是
1. 新建目录后,务必更新任务配置 `CELERY_TASKS_PACKAGES`,将新建目录添加到此列表
2. 新建目录,务必添加 `tasks.py` 文件,并在此文件中编写相关任务代码
1. `backend/app/task/tasks` 目录下新建 python 包目录
2. 新建目录,务必更新 `conf.py` 配置中的 `CELERY_TASKS_PACKAGES`,将新建目录模块路径添加到此列表
3. 在新建目录下,务必添加 `tasks.py` 文件,并在此文件中编写相关任务代码
## 消息代理

View File

@ -2,9 +2,11 @@
# -*- coding: utf-8 -*-
from fastapi import APIRouter
from backend.app.task.api.v1.task import router as task_router
from backend.app.task.api.v1.result import router as task_result_router
from backend.app.task.api.v1.scheduler import router as task_scheduler_router
from backend.core.conf import settings
v1 = APIRouter(prefix=settings.FASTAPI_API_V1_PATH)
v1 = APIRouter(prefix=f'{settings.FASTAPI_API_V1_PATH}/task', tags=['任务'])
v1.include_router(task_router, prefix='/tasks', tags=['任务'])
v1.include_router(task_result_router, prefix='/results')
v1.include_router(task_scheduler_router, prefix='/schedulers')

View File

@ -0,0 +1,57 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Annotated
from fastapi import APIRouter, Depends, Path, Query
from backend.app.task.schema.result import DeleteTaskResultParam, GetTaskResultDetail
from backend.app.task.service.result_service import task_result_service
from backend.common.pagination import DependsPagination, PageData, paging_data
from backend.common.response.response_schema import ResponseModel, ResponseSchemaModel, response_base
from backend.common.security.jwt import DependsJwtAuth
from backend.common.security.permission import RequestPermission
from backend.common.security.rbac import DependsRBAC
from backend.database.db import CurrentSession
router = APIRouter()
@router.get('/{pk}', summary='获取任务结果详情', dependencies=[DependsJwtAuth])
async def get_task_result(
pk: Annotated[int, Path(description='任务结果 ID')],
) -> ResponseSchemaModel[GetTaskResultDetail]:
result = await task_result_service.get(pk=pk)
return response_base.success(data=result)
@router.get(
'',
summary='分页获取所有任务结果',
dependencies=[
DependsJwtAuth,
DependsPagination,
],
)
async def get_task_results_paged(
db: CurrentSession,
name: Annotated[str | None, Query(description='任务名称')] = None,
task_id: Annotated[str | None, Query(description='任务 ID')] = None,
) -> ResponseSchemaModel[PageData[GetTaskResultDetail]]:
result_select = await task_result_service.get_select(name=name, task_id=task_id)
page_data = await paging_data(db, result_select)
return response_base.success(data=page_data)
@router.delete(
'',
summary='批量删除任务结果',
dependencies=[
Depends(RequestPermission('sys:task:del')),
DependsRBAC,
],
)
async def delete_task_result(obj: DeleteTaskResultParam) -> ResponseModel:
count = await task_result_service.delete(obj=obj)
if count > 0:
return response_base.success()
return response_base.fail()

View File

@ -0,0 +1,134 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Annotated
from fastapi import APIRouter, Depends, Path, Query
from backend.app.task.schema.scheduler import CreateTaskSchedulerParam, GetTaskSchedulerDetail, UpdateTaskSchedulerParam
from backend.app.task.service.scheduler_service import task_scheduler_service
from backend.common.pagination import DependsPagination, PageData, paging_data
from backend.common.response.response_schema import ResponseModel, ResponseSchemaModel, response_base
from backend.common.security.jwt import DependsJwtAuth
from backend.common.security.permission import RequestPermission
from backend.common.security.rbac import DependsRBAC
from backend.database.db import CurrentSession
router = APIRouter()
@router.get('/all', summary='获取所有任务调度', dependencies=[DependsJwtAuth])
async def get_all_task_schedulers() -> ResponseSchemaModel[list[GetTaskSchedulerDetail]]:
schedulers = await task_scheduler_service.get_all()
return response_base.success(data=schedulers)
@router.get('/{pk}', summary='获取任务调度详情', dependencies=[DependsJwtAuth])
async def get_task_scheduler(
pk: Annotated[int, Path(description='任务调度 ID')],
) -> ResponseSchemaModel[GetTaskSchedulerDetail]:
task_scheduler = await task_scheduler_service.get(pk=pk)
return response_base.success(data=task_scheduler)
@router.get(
'',
summary='分页获取所有任务调度',
dependencies=[
DependsJwtAuth,
DependsPagination,
],
)
async def get_task_scheduler_paged(
db: CurrentSession,
name: Annotated[int, Path(description='任务调度名称')] = None,
type: Annotated[int | None, Query(description='任务调度类型')] = None,
) -> ResponseSchemaModel[PageData[GetTaskSchedulerDetail]]:
task_scheduler_select = await task_scheduler_service.get_select(name=name, type=type)
page_data = await paging_data(db, task_scheduler_select)
return response_base.success(data=page_data)
@router.post(
'',
summary='创建任务调度',
dependencies=[
Depends(RequestPermission('sys:task:add')),
DependsRBAC,
],
)
async def create_task_scheduler(obj: CreateTaskSchedulerParam) -> ResponseModel:
await task_scheduler_service.create(obj=obj)
return response_base.success()
@router.put(
'/{pk}',
summary='更新任务调度',
dependencies=[
Depends(RequestPermission('sys:task:edit')),
DependsRBAC,
],
)
async def update_task_scheduler(
pk: Annotated[int, Path(description='任务调度 ID')], obj: UpdateTaskSchedulerParam
) -> ResponseModel:
count = await task_scheduler_service.update(pk=pk, obj=obj)
if count > 0:
return response_base.success()
return response_base.fail()
@router.put(
'/{pk}/status',
summary='更新任务调度状态',
dependencies=[
Depends(RequestPermission('sys:task:edit')),
DependsRBAC,
],
)
async def update_task_scheduler_status(pk: Annotated[int, Path(description='任务调度 ID')]) -> ResponseModel:
count = await task_scheduler_service.update_status(pk=pk)
if count > 0:
return response_base.success()
return response_base.fail()
@router.delete(
'/{pk}',
summary='删除任务调度',
dependencies=[
Depends(RequestPermission('sys:task:del')),
DependsRBAC,
],
)
async def delete_task_scheduler(pk: Annotated[int, Path(description='任务调度 ID')]) -> ResponseModel:
count = await task_scheduler_service.delete(pk=pk)
if count > 0:
return response_base.success()
return response_base.fail()
@router.post(
'/{pk}/executions',
summary='手动执行任务',
dependencies=[
Depends(RequestPermission('sys:task:exec')),
DependsRBAC,
],
)
async def execute_task(pk: Annotated[int, Path(description='任务调度 ID')]) -> ResponseModel:
await task_scheduler_service.execute(pk=pk)
return response_base.success()
@router.delete(
'/{task_id}/cancel',
summary='撤销任务',
dependencies=[
Depends(RequestPermission('sys:task:revoke')),
DependsRBAC,
],
)
async def revoke_task(task_id: Annotated[str, Path(description='任务 UUID')]) -> ResponseModel:
await task_scheduler_service.revoke(task_id=task_id)
return response_base.success()

View File

@ -1,58 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Annotated
from fastapi import APIRouter, Depends, Path
from backend.app.task.schema.task import RunParam, TaskResult
from backend.app.task.service.task_service import task_service
from backend.common.response.response_schema import ResponseModel, ResponseSchemaModel, response_base
from backend.common.security.jwt import DependsJwtAuth
from backend.common.security.permission import RequestPermission
from backend.common.security.rbac import DependsRBAC
router = APIRouter()
@router.get(
'/{tid}',
summary='获取任务详情',
deprecated=True,
description='此接口被视为作废,建议使用 flower 查看任务详情',
dependencies=[DependsJwtAuth],
)
async def get_task(tid: Annotated[str, Path(description='任务 UUID')]) -> ResponseSchemaModel[TaskResult]:
status = task_service.get(tid=tid)
return response_base.success(data=status)
@router.get('', summary='获取所有任务', dependencies=[DependsJwtAuth])
async def get_all_tasks() -> ResponseSchemaModel[list[str]]:
tasks = await task_service.get_all()
return response_base.success(data=tasks)
@router.delete(
'/{tid}',
summary='撤销任务',
dependencies=[
Depends(RequestPermission('sys:task:revoke')),
DependsRBAC,
],
)
async def revoke_task(tid: Annotated[str, Path(description='任务 UUID')]) -> ResponseModel:
task_service.revoke(tid=tid)
return response_base.success()
@router.post(
'/runs',
summary='运行任务',
dependencies=[
Depends(RequestPermission('sys:task:run')),
DependsRBAC,
],
)
async def run_task(obj: RunParam) -> ResponseSchemaModel[str]:
task = task_service.run(obj=obj)
return response_base.success(data=task)

View File

@ -1,44 +1,23 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Any
import os
import celery
import celery_aio_pool
from backend.app.task.model.result import OVERWRITE_CELERY_RESULT_GROUP_TABLE_NAME, OVERWRITE_CELERY_RESULT_TABLE_NAME
from backend.app.task.tasks.beat import LOCAL_BEAT_SCHEDULE
from backend.core.conf import settings
__all__ = ['celery_app']
from backend.core.path_conf import BASE_PATH
def get_broker_url() -> str:
"""获取消息代理 URL"""
if settings.CELERY_BROKER == 'redis':
return (
f'redis://:{settings.REDIS_PASSWORD}@{settings.REDIS_HOST}:'
f'{settings.REDIS_PORT}/{settings.CELERY_BROKER_REDIS_DATABASE}'
)
return (
f'amqp://{settings.CELERY_RABBITMQ_USERNAME}:{settings.CELERY_RABBITMQ_PASSWORD}@'
f'{settings.CELERY_RABBITMQ_HOST}:{settings.CELERY_RABBITMQ_PORT}'
)
def get_result_backend() -> str:
"""获取结果后端 URL"""
return (
f'redis://:{settings.REDIS_PASSWORD}@{settings.REDIS_HOST}:'
f'{settings.REDIS_PORT}/{settings.CELERY_BACKEND_REDIS_DATABASE}'
)
def get_result_backend_transport_options() -> dict[str, Any]:
"""获取结果后端传输选项"""
return {
'global_keyprefix': settings.CELERY_BACKEND_REDIS_PREFIX,
'retry_policy': {
'timeout': settings.CELERY_BACKEND_REDIS_TIMEOUT,
},
}
def find_task_packages():
packages = []
for root, dirs, files in os.walk(os.path.join(BASE_PATH, 'app', 'task', 'tasks')):
if 'tasks.py' in files:
package = root.replace(str(BASE_PATH) + os.path.sep, '').replace(os.path.sep, '.')
packages.append(package)
return packages
def init_celery() -> celery.Celery:
@ -52,19 +31,29 @@ def init_celery() -> celery.Celery:
app = celery.Celery(
'fba_celery',
broker=f'redis://:{settings.REDIS_PASSWORD}@{settings.REDIS_HOST}:{settings.REDIS_PORT}/{settings.CELERY_BROKER_REDIS_DATABASE}'
if settings.CELERY_BROKER == 'redis'
else f'amqp://{settings.CELERY_RABBITMQ_USERNAME}:{settings.CELERY_RABBITMQ_PASSWORD}@{settings.CELERY_RABBITMQ_HOST}:{settings.CELERY_RABBITMQ_PORT}',
broker_connection_retry_on_startup=True,
backend=f'db+{settings.DATABASE_TYPE + "+pymysql" if settings.DATABASE_TYPE == "mysql" else settings.DATABASE_TYPE}' # noqa: E501
f'://{settings.DATABASE_USER}:{settings.DATABASE_PASSWORD}@{settings.DATABASE_HOST}:{settings.DATABASE_PORT}/{settings.DATABASE_SCHEMA}',
database_engine_options={'echo': settings.DATABASE_ECHO},
database_table_names={
'task': OVERWRITE_CELERY_RESULT_TABLE_NAME,
'group': OVERWRITE_CELERY_RESULT_GROUP_TABLE_NAME,
},
result_extended=True,
# result_expires=0, # 任务结果自动清理0 或 None 表示不清理
beat_schedule=LOCAL_BEAT_SCHEDULE,
beat_scheduler='app.task.utils.schedulers:DatabaseScheduler',
task_cls='app.task.tasks.base:TaskBase',
task_track_started=True,
enable_utc=False,
timezone=settings.DATETIME_TIMEZONE,
beat_schedule=settings.CELERY_SCHEDULE,
broker_url=get_broker_url(),
broker_connection_retry_on_startup=True,
result_backend=get_result_backend(),
result_backend_transport_options=get_result_backend_transport_options(),
task_cls='app.task.celery_task.base:TaskBase',
task_track_started=True,
)
# 自动发现任务
app.autodiscover_tasks(settings.CELERY_TASK_PACKAGES)
app.autodiscover_tasks(find_task_packages())
return app

View File

@ -0,0 +1,51 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from sqlalchemy import Select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy_crud_plus import CRUDPlus
from backend.app.task.model.result import TaskResult
class CRUDTaskResult(CRUDPlus[TaskResult]):
"""任务结果数据库操作类"""
async def get(self, db: AsyncSession, pk: int) -> TaskResult | None:
"""
获取任务结果详情
:param db: 数据库会话
:param pk: 任务 ID
:return:
"""
return await self.select_model(db, pk)
async def get_list(self, name: str | None, task_id: str | None) -> Select:
"""
获取任务结果列表
:param name: 任务名称
:param task_id: 任务 ID
:return:
"""
filters = {}
if name is not None:
filters['name__like'] = f'%{name}%'
if task_id is not None:
filters['task_id'] = task_id
return await self.select_order('id', **filters)
async def delete(self, db: AsyncSession, pks: list[int]) -> int:
"""
批量删除任务结果
:param db: 数据库会话
:param pks: 任务结果 ID 列表
:return:
"""
return await self.delete_model_by_column(db, allow_multiple=True, id__in=pks)
task_result_dao: CRUDTaskResult = CRUDTaskResult(TaskResult)

View File

@ -0,0 +1,117 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Sequence
from sqlalchemy import Select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy_crud_plus import CRUDPlus
from backend.app.task.model import TaskScheduler
from backend.app.task.schema.scheduler import CreateTaskSchedulerParam, UpdateTaskSchedulerParam
class CRUDTaskScheduler(CRUDPlus[TaskScheduler]):
"""任务调度数据库操作类"""
@staticmethod
async def get(db: AsyncSession, pk: int) -> TaskScheduler | None:
"""
获取任务调度
:param db: 数据库会话
:param pk: 任务调度 ID
:return:
"""
return await task_scheduler_dao.select_model(db, pk)
async def get_all(self, db: AsyncSession) -> Sequence[TaskScheduler]:
"""
获取所有任务调度
:param db: 数据库会话
:return:
"""
return await self.select_models(db)
async def get_list(self, name: str | None, type: int | None) -> Select:
"""
获取任务调度列表
:param name: 任务调度名称
:param type: 任务调度类型
:return:
"""
filters = {}
if name is not None:
filters['name__like'] = f'%{name}%'
if type is not None:
filters['type'] = type
return await self.select_order('id', **filters)
async def get_by_name(self, db: AsyncSession, name: str) -> TaskScheduler | None:
"""
通过名称获取任务调度
:param db: 数据库会话
:param name: 任务调度名称
:return:
"""
return await self.select_model_by_column(db, name=name)
async def create(self, db: AsyncSession, obj: CreateTaskSchedulerParam) -> None:
"""
创建任务调度
:param db: 数据库会话
:param obj: 创建任务调度参数
:return:
"""
await self.create_model(db, obj, flush=True)
TaskScheduler.no_changes = False
async def update(self, db: AsyncSession, pk: int, obj: UpdateTaskSchedulerParam) -> int:
"""
更新任务调度
:param db: 数据库会话
:param pk: 任务调度 ID
:param obj: 更新任务调度参数
:return:
"""
task_scheduler = await self.get(db, pk)
for key, value in obj.model_dump(exclude_unset=True).items():
setattr(task_scheduler, key, value)
TaskScheduler.no_changes = False
return 1
async def set_status(self, db: AsyncSession, pk: int, status: bool) -> int:
"""
设置任务调度状态
:param db: 数据库会话
:param pk: 任务调度 ID
:param status: 状态
:return:
"""
task_scheduler = await self.get(db, pk)
setattr(task_scheduler, 'enabled', status)
TaskScheduler.no_changes = False
return 1
async def delete(self, db: AsyncSession, pk: int) -> int:
"""
删除任务调度
:param db: 数据库会话
:param pk: 任务调度 ID
:return:
"""
task_scheduler = await self.get(db, pk)
await db.delete(task_scheduler)
TaskScheduler.no_changes = False
return 1
task_scheduler_dao: CRUDTaskScheduler = CRUDTaskScheduler(TaskScheduler)

20
backend/app/task/enums.py Normal file
View File

@ -0,0 +1,20 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from backend.common.enums import IntEnum, StrEnum
class TaskSchedulerType(IntEnum):
"""任务调度类型"""
INTERVAL = 0
CRONTAB = 1
class PeriodType(StrEnum):
"""周期类型"""
DAYS = 'days'
HOURS = 'hours'
MINUTES = 'minutes'
SECONDS = 'seconds'
MICROSECONDS = 'microseconds'

View File

@ -0,0 +1,3 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from backend.app.task.model.scheduler import TaskScheduler

View File

@ -0,0 +1,9 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from celery.backends.database.models import TaskExtended as TaskResult
OVERWRITE_CELERY_RESULT_TABLE_NAME = 'task_result'
OVERWRITE_CELERY_RESULT_GROUP_TABLE_NAME = 'task_group_result'
# 重写表名配置
TaskResult.configure(name=OVERWRITE_CELERY_RESULT_TABLE_NAME)

View File

@ -0,0 +1,94 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import asyncio
from datetime import datetime
from sqlalchemy import (
JSON,
Boolean,
DateTime,
String,
event,
)
from sqlalchemy.dialects.mysql import LONGTEXT
from sqlalchemy.dialects.postgresql import INTEGER, TEXT
from sqlalchemy.orm import Mapped, mapped_column
from backend.common.exception import errors
from backend.common.model import Base, id_key
from backend.core.conf import settings
from backend.database.redis import redis_client
from backend.utils.timezone import timezone
class TaskScheduler(Base):
"""任务调度表"""
__tablename__ = 'task_scheduler'
id: Mapped[id_key] = mapped_column(init=False)
name: Mapped[str] = mapped_column(String(50), unique=True, comment='任务名称')
task: Mapped[str] = mapped_column(String(255), comment='要运行的 Celery 任务(模块化字符串)')
args: Mapped[str | None] = mapped_column(JSON(), comment='任务可接收的位置参数')
kwargs: Mapped[str | None] = mapped_column(JSON(), comment='任务可接收的关键字参数')
queue: Mapped[str | None] = mapped_column(String(255), comment='CELERY_TASK_QUEUES 中定义的队列')
exchange: Mapped[str | None] = mapped_column(String(255), comment='低级别 AMQP 路由的交换机')
routing_key: Mapped[str | None] = mapped_column(String(255), comment='低级别 AMQP 路由的路由密钥')
start_time: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), comment='任务开始触发的时间')
expire_time: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), comment='任务不再触发的截止时间')
expire_seconds: Mapped[int | None] = mapped_column(comment='任务不再触发的秒数时间差')
type: Mapped[int] = mapped_column(comment='调度类型0间隔 1定时')
interval_every: Mapped[int | None] = mapped_column(comment='任务再次运行前的间隔周期数')
interval_period: Mapped[str | None] = mapped_column(String(255), comment='任务运行之间的周期类型')
crontab_minute: Mapped[str | None] = mapped_column(String(60 * 4), default='*', comment='运行的分钟,"*" 表示全部')
crontab_hour: Mapped[str | None] = mapped_column(String(24 * 4), default='*', comment='运行的小时,"*" 表示全部')
crontab_day_of_week: Mapped[str | None] = mapped_column(String(64), default='*', comment='运行的星期,"*" 表示全部')
crontab_day_of_month: Mapped[str | None] = mapped_column(
String(31 * 4), default='*', comment='运行的每月日期,"*" 表示全部'
)
crontab_month_of_year: Mapped[str | None] = mapped_column(
String(64), default='*', comment='运行的月份,"*" 表示全部'
)
one_off: Mapped[bool] = mapped_column(
Boolean().with_variant(INTEGER, 'postgresql'), default=False, comment='是否仅运行一次'
)
enabled: Mapped[bool] = mapped_column(
Boolean().with_variant(INTEGER, 'postgresql'), default=True, comment='是否启用任务'
)
total_run_count: Mapped[int] = mapped_column(default=0, comment='任务触发的总次数')
last_run_time: Mapped[datetime | None] = mapped_column(
DateTime(timezone=True), default=None, comment='任务最后触发的时间'
)
remark: Mapped[str | None] = mapped_column(
LONGTEXT().with_variant(TEXT, 'postgresql'), default=None, comment='备注'
)
no_changes: bool = False
@staticmethod
def before_insert_or_update(mapper, connection, target):
if target.expire_seconds is not None and target.expire_time:
raise errors.ConflictError(msg='expires 和 expire_seconds 只能设置一个')
@classmethod
def changed(cls, mapper, connection, target):
if not target.no_changes:
cls.update_changed(mapper, connection, target)
@classmethod
async def update_changed_async(cls):
now = timezone.now()
await redis_client.set(f'{settings.CELERY_REDIS_PREFIX}:last_update', timezone.to_str(now))
@classmethod
def update_changed(cls, mapper, connection, target):
asyncio.create_task(cls.update_changed_async())
# 事件监听器
event.listen(TaskScheduler, 'before_insert', TaskScheduler.before_insert_or_update)
event.listen(TaskScheduler, 'before_update', TaskScheduler.before_insert_or_update)
event.listen(TaskScheduler, 'after_insert', TaskScheduler.update_changed)
event.listen(TaskScheduler, 'after_delete', TaskScheduler.update_changed)
event.listen(TaskScheduler, 'after_update', TaskScheduler.changed)

View File

@ -0,0 +1,37 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from datetime import datetime
from pydantic import ConfigDict, Field
from backend.common.schema import SchemaBase
class TaskResultSchemaBase(SchemaBase):
"""任务结果基础模型"""
task_id: str = Field(description='任务 ID')
status: str = Field(description='执行状态')
result: bytes | None = Field(description='执行结果')
date_done: datetime | None = Field(description='结束时间')
traceback: str | None = Field(description='错误回溯')
name: str | None = Field(description='任务名称')
args: bytes | None = Field(description='任务位置参数')
kwargs: bytes | None = Field(description='任务关键字参数')
worker: str | None = Field(description='运行 Worker')
retries: int | None = Field(description='重试次数')
queue: str | None = Field(description='运行队列')
class DeleteTaskResultParam(SchemaBase):
"""删除任务结果参数"""
pks: list[int] = Field(description='任务结果 ID 列表')
class GetTaskResultDetail(TaskResultSchemaBase):
"""任务结果详情"""
model_config = ConfigDict(from_attributes=True)
id: int = Field(description='任务结果 ID')

View File

@ -0,0 +1,52 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from datetime import datetime
from pydantic import ConfigDict, Field
from pydantic.types import JsonValue
from backend.app.task.enums import PeriodType, TaskSchedulerType
from backend.common.schema import SchemaBase
class TaskSchedulerSchemeBase(SchemaBase):
"""任务调度参数"""
name: str = Field(description='任务名称')
task: str = Field(description='要运行的 Celery 任务(模块化字符串)')
args: JsonValue | None = Field(default='[]', description='任务可接收的位置参数')
kwargs: JsonValue | None = Field(default='{}', description='任务可接收的关键字参数')
queue: str | None = Field(default=None, description='CELERY_TASK_QUEUES 中定义的队列')
exchange: str | None = Field(default=None, description='低级别 AMQP 路由的交换机')
routing_key: str | None = Field(default=None, description='低级别 AMQP 路由的路由密钥')
start_time: datetime | None = Field(default=None, description='任务开始触发的时间')
expire_time: datetime | None = Field(default=None, description='任务不再触发的截止时间')
expire_seconds: int | None = Field(default=None, description='任务不再触发的秒数时间差')
type: TaskSchedulerType = Field(default=TaskSchedulerType.INTERVAL, description='任务调度类型0间隔 1定时')
interval_every: int | None = Field(default=None, description='任务再次运行前的间隔周期数')
interval_period: PeriodType | None = Field(default=None, description='任务运行之间的周期类型')
crontab_minute: str | None = Field(default='*', description='运行的分钟,"*" 表示全部')
crontab_hour: str | None = Field(default='*', description='运行的小时,"*" 表示全部')
crontab_day_of_week: str | None = Field(default='*', description='运行的星期,"*" 表示全部')
crontab_day_of_month: str | None = Field(default='*', description='运行的每月日期,"*" 表示全部')
crontab_month_of_year: str | None = Field(default='*', description='运行的月份,"*" 表示全部')
one_off: bool = Field(default=False, description='是否仅运行一次')
remark: str | None = Field(default=None, description='备注')
class CreateTaskSchedulerParam(TaskSchedulerSchemeBase):
"""创建任务调度参数"""
class UpdateTaskSchedulerParam(TaskSchedulerSchemeBase):
"""更新任务调度参数"""
class GetTaskSchedulerDetail(TaskSchedulerSchemeBase):
"""任务调度详情"""
model_config = ConfigDict(from_attributes=True)
id: int = Field(description='任务调度 ID')
created_time: datetime = Field(description='创建时间')
updated_time: datetime | None = Field(None, description='更新时间')

View File

@ -1,29 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Any
from pydantic import Field
from backend.common.schema import SchemaBase
class RunParam(SchemaBase):
"""任务运行参数"""
name: str = Field(description='任务名称')
args: list[Any] | None = Field(None, description='任务函数位置参数')
kwargs: dict[str, Any] | None = Field(None, description='任务函数关键字参数')
class TaskResult(SchemaBase):
"""任务执行结果"""
result: str = Field(description='任务执行结果')
traceback: str | None = Field(None, description='错误堆栈信息')
status: str = Field(description='任务状态')
name: str | None = Field(None, description='任务名称')
args: list[Any] | None = Field(None, description='任务函数位置参数')
kwargs: dict[str, Any] | None = Field(None, description='任务函数关键字参数')
worker: str | None = Field(None, description='执行任务的 worker')
retries: int | None = Field(None, description='重试次数')
queue: str | None = Field(None, description='任务队列')

View File

@ -0,0 +1,51 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from sqlalchemy import Select
from backend.app.task.crud.crud_result import task_result_dao
from backend.app.task.model.result import TaskResult
from backend.app.task.schema.result import DeleteTaskResultParam
from backend.common.exception import errors
from backend.database.db import async_db_session
class TaskResultService:
@staticmethod
async def get(*, pk: int) -> TaskResult:
"""
获取任务结果详情
:param pk: 任务 ID
:return:
"""
async with async_db_session() as db:
result = await task_result_dao.get(db, pk)
if not result:
raise errors.NotFoundError(msg='任务结果不存在')
return result
@staticmethod
async def get_select(*, name: str | None, task_id: str | None) -> Select:
"""
获取任务结果列表查询条件
:param name: 任务名称
:param task_id: 任务 ID
:return:
"""
return await task_result_dao.get_list(name, task_id)
@staticmethod
async def delete(*, obj: DeleteTaskResultParam) -> int:
"""
批量删除任务结果
:param obj: 任务结果 ID 列表
:return:
"""
async with async_db_session.begin() as db:
count = await task_result_dao.delete(db, obj.pks)
return count
task_result_service: TaskResultService = TaskResultService()

View File

@ -0,0 +1,165 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json
from typing import Sequence
from sqlalchemy import Select
from starlette.concurrency import run_in_threadpool
from backend.app.task.celery import celery_app
from backend.app.task.crud.crud_scheduler import task_scheduler_dao
from backend.app.task.enums import TaskSchedulerType
from backend.app.task.model import TaskScheduler
from backend.app.task.schema.scheduler import CreateTaskSchedulerParam, UpdateTaskSchedulerParam
from backend.app.task.utils.tzcrontab import crontab_verify
from backend.common.exception import errors
from backend.database.db import async_db_session
class TaskSchedulerService:
"""任务调度服务类"""
@staticmethod
async def get(*, pk) -> TaskScheduler | None:
"""
获取任务调度详情
:param pk: 任务调度 ID
:return:
"""
async with async_db_session() as db:
task_scheduler = await task_scheduler_dao.get(db, pk)
if not task_scheduler:
raise errors.NotFoundError(msg='任务调度不存在')
return task_scheduler
@staticmethod
async def get_all() -> Sequence[TaskScheduler]:
"""获取所有任务调度"""
async with async_db_session() as db:
task_schedulers = await task_scheduler_dao.get_all(db)
return task_schedulers
@staticmethod
async def get_select(*, name: str | None, type: int | None) -> Select:
"""
获取任务调度列表查询条件
:param name: 任务调度名称
:param type: 任务调度类型
:return:
"""
return await task_scheduler_dao.get_list(name=name, type=type)
@staticmethod
async def create(*, obj: CreateTaskSchedulerParam) -> None:
"""
创建任务调度
:param obj: 任务调度创建参数
:return:
"""
async with async_db_session.begin() as db:
task_scheduler = await task_scheduler_dao.get_by_name(db, obj.name)
if task_scheduler:
raise errors.ConflictError(msg='任务调度已存在')
await task_scheduler_dao.create(db, obj)
@staticmethod
async def update(*, pk: int, obj: UpdateTaskSchedulerParam) -> int:
"""
更新任务调度
:param pk: 任务调度 ID
:param obj: 任务调度更新参数
:return:
"""
async with async_db_session.begin() as db:
task_scheduler = await task_scheduler_dao.get(db, pk)
if not task_scheduler:
raise errors.NotFoundError(msg='任务调度不存在')
if task_scheduler.name != obj.name:
if await task_scheduler_dao.get_by_name(db, obj.name):
raise errors.ConflictError(msg='任务调度已存在')
if task_scheduler.type == TaskSchedulerType.CRONTAB:
crontab_verify('m', task_scheduler.crontab_minute)
crontab_verify('h', task_scheduler.crontab_hour)
crontab_verify('dow', task_scheduler.crontab_day_of_week)
crontab_verify('dom', task_scheduler.crontab_day_of_month)
crontab_verify('moy', task_scheduler.crontab_month_of_year)
count = await task_scheduler_dao.update(db, pk, obj)
return count
@staticmethod
async def update_status(*, pk: int) -> int:
"""
更新任务调度状态
:param pk: 任务调度 ID
:return:
"""
async with async_db_session.begin() as db:
task_scheduler = await task_scheduler_dao.get(db, pk)
if not task_scheduler:
raise errors.NotFoundError(msg='任务调度不存在')
if task_scheduler.type == TaskSchedulerType.CRONTAB:
crontab_verify('m', task_scheduler.crontab_minute)
crontab_verify('h', task_scheduler.crontab_hour)
crontab_verify('dow', task_scheduler.crontab_day_of_week)
crontab_verify('dom', task_scheduler.crontab_day_of_month)
crontab_verify('moy', task_scheduler.crontab_month_of_year)
count = await task_scheduler_dao.set_status(db, pk, not task_scheduler.enabled)
return count
@staticmethod
async def delete(*, pk) -> int:
"""
删除任务调度
:param pk: 用户 ID
:return:
"""
async with async_db_session.begin() as db:
task_scheduler = await task_scheduler_dao.get(db, pk)
if not task_scheduler:
raise errors.NotFoundError(msg='任务调度不存在')
count = await task_scheduler_dao.delete(db, pk)
return count
@staticmethod
async def execute(*, pk: int) -> None:
"""
执行任务
:param pk: 任务调度 ID
:return:
"""
async with async_db_session() as db:
workers = await run_in_threadpool(celery_app.control.ping, timeout=0.5)
if not workers:
raise errors.ServerError(msg='Celery Worker 暂不可用,请稍后重试')
task_scheduler = await task_scheduler_dao.get(db, pk)
if not task_scheduler:
raise errors.NotFoundError(msg='任务调度不存在')
celery_app.send_task(
name=task_scheduler.task,
args=json.loads(task_scheduler.args),
kwargs=json.loads(task_scheduler.kwargs),
)
@staticmethod
async def revoke(*, task_id: str) -> None:
"""
撤销指定的任务
:param task_id: 任务 UUID
:return:
"""
workers = await run_in_threadpool(celery_app.control.ping, timeout=0.5)
if not workers:
raise errors.ServerError(msg='Celery Worker 暂不可用,请稍后重试')
celery_app.control.revoke(task_id)
task_scheduler_service: TaskSchedulerService = TaskSchedulerService()

View File

@ -1,72 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from celery.exceptions import NotRegistered
from celery.result import AsyncResult
from starlette.concurrency import run_in_threadpool
from backend.app.task.celery import celery_app
from backend.app.task.schema.task import RunParam, TaskResult
from backend.common.exception import errors
class TaskService:
@staticmethod
def get(*, tid: str) -> TaskResult:
"""
获取指定任务的详细信息
:param tid: 任务 UUID
:return:
"""
try:
result = AsyncResult(id=tid, app=celery_app)
except NotRegistered:
raise errors.NotFoundError(msg='任务不存在')
return TaskResult(
result=result.result,
traceback=result.traceback,
status=result.state,
name=result.name,
args=result.args,
kwargs=result.kwargs,
worker=result.worker,
retries=result.retries,
queue=result.queue,
)
@staticmethod
async def get_all() -> list[str]:
"""获取所有已注册的 Celery 任务列表"""
registered_tasks = await run_in_threadpool(celery_app.control.inspect().registered)
if not registered_tasks:
raise errors.ServerError(msg='Celery 服务未启动')
tasks = list(registered_tasks.values())[0]
return tasks
@staticmethod
def revoke(*, tid: str) -> None:
"""
撤销指定的任务
:param tid: 任务 UUID
:return:
"""
try:
result = AsyncResult(id=tid, app=celery_app)
except NotRegistered:
raise errors.NotFoundError(msg='任务不存在')
result.revoke(terminate=True)
@staticmethod
def run(*, obj: RunParam) -> str:
"""
运行指定的任务
:param obj: 任务运行参数
:return:
"""
task: AsyncResult = celery_app.send_task(name=obj.name, args=obj.args, kwargs=obj.kwargs)
return task.task_id
task_service: TaskService = TaskService()

View File

@ -45,5 +45,4 @@ class TaskBase(Task):
:param einfo: 异常信息
:return:
"""
loop = asyncio.get_event_loop()
loop.create_task(task_notification(msg=f'任务 {task_id} 执行失败'))
asyncio.create_task(task_notification(msg=f'任务 {task_id} 执行失败'))

View File

@ -0,0 +1,24 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from celery.schedules import schedule
from backend.app.task.utils.tzcrontab import TzAwareCrontab
LOCAL_BEAT_SCHEDULE = {
'exec_every_10_seconds': {
'task': 'task_demo_async',
'schedule': schedule(10),
},
'exec_every_1_minute_of_hour': {
'task': 'task_demo_async',
'schedule': TzAwareCrontab('1'),
},
'exec_every_sunday': {
'task': 'delete_db_opera_log',
'schedule': TzAwareCrontab('0', '0', day_of_week='6'),
},
'exec_every_15_of_month': {
'task': 'delete_db_login_log',
'schedule': TzAwareCrontab('0', '0', day_of_month='15'),
},
}

View File

@ -0,0 +1,2 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

View File

@ -0,0 +1,2 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

View File

@ -0,0 +1,416 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import asyncio
import json
import math
from datetime import datetime, timedelta
from multiprocessing.util import Finalize
from celery import current_app, schedules
from celery.beat import ScheduleEntry, Scheduler
from celery.utils.log import get_logger
from sqlalchemy import select
from sqlalchemy.exc import DatabaseError, InterfaceError
from backend.app.task.enums import PeriodType, TaskSchedulerType
from backend.app.task.model.scheduler import TaskScheduler
from backend.app.task.schema.scheduler import CreateTaskSchedulerParam
from backend.app.task.utils.tzcrontab import TzAwareCrontab, crontab_verify
from backend.common.exception import errors
from backend.core.conf import settings
from backend.database.db import async_db_session
from backend.database.redis import redis_client
from backend.utils._await import run_await
from backend.utils.serializers import select_as_dict
from backend.utils.timezone import timezone
# 此计划程序必须比常规的 5 分钟更频繁地唤醒,因为它需要考虑对计划的外部更改
DEFAULT_MAX_INTERVAL = 5 # seconds
logger = get_logger('fba.schedulers')
class ModelEntry(ScheduleEntry):
"""任务调度实体"""
def __init__(self, model: TaskScheduler, app=None):
super().__init__(
app=app or current_app._get_current_object(),
name=model.name,
task=model.task,
)
try:
if (
model.type == TaskSchedulerType.INTERVAL
and model.interval_every is not None
and model.interval_period is not None
):
self.schedule = schedules.schedule(timedelta(**{model.interval_period: model.interval_every}))
elif model.type == TaskSchedulerType.CRONTAB and model.crontab_minute is not None:
self.schedule = TzAwareCrontab(
minute=model.crontab_minute,
hour=model.crontab_hour or '*',
day_of_week=model.crontab_day_of_week or '*',
day_of_month=model.crontab_day_of_month or '*',
month_of_year=model.crontab_month_of_year or '*',
)
else:
raise errors.NotFoundError(msg=f'{self.name} 计划为空!')
# logger.debug('Schedule: {}'.format(self.schedule))
except Exception as e:
logger.error(f'禁用计划为空的任务 {self.name},详情:{e}')
asyncio.create_task(self._disable(model))
try:
self.args = json.loads(model.args) if model.args else []
self.kwargs = json.loads(model.kwargs) if model.kwargs else {}
except ValueError as exc:
logger.error(f'禁用参数错误的任务:{self.name}error: {str(exc)}')
asyncio.create_task(self._disable(model))
self.options = {}
for option in ['queue', 'exchange', 'routing_key']:
value = getattr(model, option)
if value is None:
continue
self.options[option] = value
expires = getattr(model, 'expires_', None)
if expires:
if isinstance(expires, int):
self.options['expires'] = expires
elif isinstance(expires, datetime):
self.options['expires'] = timezone.from_datetime(expires)
if not model.last_run_time:
model.last_run_time = timezone.now()
if model.start_time:
model.last_run_time = timezone.from_datetime(model.start_time) - timedelta(days=365)
self.last_run_at = timezone.from_datetime(model.last_run_time)
self.options['periodic_task_name'] = model.name
self.model = model
async def _disable(self, model: TaskScheduler) -> None:
"""禁用任务"""
model.no_changes = True
self.model.enabled = self.enabled = model.enabled = False
async with async_db_session.begin():
setattr(model, 'enabled', False)
def is_due(self) -> tuple[bool, int | float]:
"""任务到期状态"""
if not self.model.enabled:
# 重新启用时延迟 5 秒
return schedules.schedstate(is_due=False, next=5)
# 仅在 'start_time' 之后运行
if self.model.start_time is not None:
now = timezone.now()
start_time = timezone.from_datetime(self.model.start_time)
if now < start_time:
delay = math.ceil((start_time - now).total_seconds())
return schedules.schedstate(is_due=False, next=delay)
# 一次性任务
if self.model.one_off and self.model.enabled and self.model.total_run_count > 0:
self.model.enabled = False
self.model.total_run_count = 0
self.model.no_changes = False
save_fields = ('enabled',)
run_await(self.save)(save_fields)
return schedules.schedstate(is_due=False, next=1000000000) # 高延迟,避免重新检查
return self.schedule.is_due(self.last_run_at)
def __next__(self):
self.model.last_run_time = timezone.now()
self.model.total_run_count += 1
self.model.no_changes = True
return self.__class__(self.model)
next = __next__
async def save(self, fields: tuple = ()):
"""
保存任务状态字段
:param fields: 要保存的其他字段
:return:
"""
async with async_db_session.begin() as db:
stmt = select(TaskScheduler).where(TaskScheduler.id == self.model.id).with_for_update()
query = await db.execute(stmt)
task = query.scalars().first()
if task:
for field in ['last_run_time', 'total_run_count', 'no_changes']:
setattr(task, field, getattr(self.model, field))
for field in fields:
setattr(task, field, getattr(self.model, field))
else:
logger.warning(f'任务 {self.model.name} 不存在,跳过更新')
@classmethod
async def from_entry(cls, name, app=None, **entry):
"""保存或更新本地任务调度"""
async with async_db_session.begin() as db:
stmt = select(TaskScheduler).where(TaskScheduler.name == name)
query = await db.execute(stmt)
task = query.scalars().first()
temp = await cls._unpack_fields(name, **entry)
if not task:
task = TaskScheduler(**temp)
db.add(task)
else:
for key, value in temp.items():
setattr(task, key, value)
res = cls(task, app=app)
return res
@staticmethod
async def to_model_schedule(name: str, task: str, schedule: schedules.schedule | TzAwareCrontab):
schedule = schedules.maybe_schedule(schedule)
async with async_db_session() as db:
if isinstance(schedule, schedules.schedule):
every = max(schedule.run_every.total_seconds(), 0)
spec = {
'type': TaskSchedulerType.INTERVAL.value,
'interval_every': every,
'interval_period': PeriodType.SECONDS.value,
}
stmt = select(TaskScheduler).filter_by(**spec)
query = await db.execute(stmt)
obj = query.scalars().first()
if not obj:
obj = TaskScheduler(**CreateTaskSchedulerParam(name=name, task=task, **spec).model_dump())
return obj
elif isinstance(schedule, schedules.crontab):
spec = {
'type': TaskSchedulerType.CRONTAB.value,
'crontab_minute': schedule._orig_minute
if crontab_verify('m', schedule._orig_minute, False)
else '*',
'crontab_hour': schedule._orig_hour if crontab_verify('h', schedule._orig_hour, False) else '*',
'crontab_day_of_week': schedule._orig_day_of_week
if crontab_verify('dom', schedule._orig_day_of_week, False)
else '*',
'crontab_day_of_month': schedule._orig_day_of_month
if crontab_verify('dom', schedule._orig_day_of_month, False)
else '*',
'crontab_month_of_year': schedule._orig_month_of_year
if crontab_verify('moy', schedule._orig_month_of_year, False)
else '*',
}
stmt = select(TaskScheduler).filter_by(**spec)
query = await db.execute(stmt)
obj = query.scalars().first()
if not obj:
obj = TaskScheduler(
**CreateTaskSchedulerParam(
name=name,
task=task,
**spec,
).model_dump()
)
return obj
else:
raise errors.NotFoundError(msg=f'暂不支持的计划类型:{schedule}')
@classmethod
async def _unpack_fields(
cls,
name: str,
task: str,
schedule: schedules.schedule | TzAwareCrontab,
args: tuple | None = None,
kwargs: dict | None = None,
options: dict = None,
**entry,
) -> dict:
model_schedule = await cls.to_model_schedule(name, task, schedule)
model_dict = select_as_dict(model_schedule)
for k in ['id', 'created_time', 'updated_time']:
try:
del model_dict[k]
except KeyError:
continue
model_dict.update(
args=json.dumps(args or []),
kwargs=json.dumps(kwargs or {}),
**cls._unpack_options(**options or {}),
**entry,
)
return model_dict
@classmethod
def _unpack_options(
cls,
queue: str = None,
exchange: str = None,
routing_key: str = None,
start_time: datetime = None,
expires: datetime = None,
expire_seconds: int = None,
one_off: bool = False,
) -> dict:
data = {
'queue': queue,
'exchange': exchange,
'routing_key': routing_key,
'start_time': start_time,
'expire_time': expires,
'expire_seconds': expire_seconds,
'one_off': one_off,
}
if expires:
if isinstance(expires, int):
data['expire_seconds'] = expires
elif isinstance(expires, timedelta):
data['expire_time'] = timezone.now() + expires
return data
class DatabaseScheduler(Scheduler):
Entry = ModelEntry
_schedule = None
_last_update = None
_initial_read = True
_heap_invalidated = False
def __init__(self, *args, **kwargs):
self.app = kwargs['app']
self._dirty = set()
super().__init__(*args, **kwargs)
self._finalize = Finalize(self, self.sync, exitpriority=5)
self.max_interval = kwargs.get('max_interval') or self.app.conf.beat_max_loop_interval or DEFAULT_MAX_INTERVAL
def setup_schedule(self):
"""重写父函数"""
logger.info('setup_schedule')
tasks = self.schedule
self.install_default_entries(tasks)
self.update_from_dict(self.app.conf.beat_schedule)
async def get_all_task_schedulers(self):
"""获取所有任务调度"""
async with async_db_session() as db:
logger.debug('DatabaseScheduler: Fetching database schedule')
stmt = select(TaskScheduler).where(TaskScheduler.enabled == 1)
query = await db.execute(stmt)
tasks = query.scalars().all()
s = {}
for task in tasks:
s[task.name] = self.Entry(task, app=self.app)
return s
def schedule_changed(self) -> bool:
"""任务调度变更状态"""
now = timezone.now()
last_update = run_await(redis_client.get)(f'{settings.CELERY_REDIS_PREFIX}:last_update')
if not last_update:
run_await(redis_client.set)(f'{settings.CELERY_REDIS_PREFIX}:last_update', timezone.to_str(now))
return False
last, ts = self._last_update, timezone.from_str(last_update)
try:
if ts and ts > (last if last else ts):
return True
finally:
self._last_update = now
def reserve(self, entry):
"""重写父函数"""
new_entry = next(entry)
# 需要按名称存储条目,因为条目可能会发生变化
self._dirty.add(new_entry.name)
return new_entry
def sync(self):
"""重写父函数"""
_tried = set()
_failed = set()
try:
while self._dirty:
name = self._dirty.pop()
try:
tasks = self.schedule
run_await(tasks[name].save)()
logger.debug(f'保存任务 {name} 最新状态到数据库')
_tried.add(name)
except KeyError as e:
logger.error(f'保存任务 {name} 最新状态失败:{e} ')
_failed.add(name)
except DatabaseError as e:
logger.exception('同步时出现数据库错误: %r', e)
except InterfaceError as e:
logger.warning(f'DatabaseScheduler InterfaceError{str(e)},等待下次调用时重试...')
finally:
# 请稍后重试(仅针对失败的)
self._dirty |= _failed
def update_from_dict(self, beat_dict: dict):
"""重写父函数"""
s = {}
for name, entry_fields in beat_dict.items():
try:
entry = run_await(self.Entry.from_entry)(name, app=self.app, **entry_fields)
if entry.model.enabled:
s[name] = entry
except Exception as e:
logger.error(f'添加任务 {name} 到数据库失败')
raise e
tasks = self.schedule
tasks.update(s)
def install_default_entries(self, data):
"""重写父函数"""
entries = {}
if self.app.conf.result_expires:
entries.setdefault(
'celery.backend_cleanup',
{
'task': 'celery.backend_cleanup',
'schedule': schedules.crontab('0', '4', '*'),
'options': {'expire_seconds': 12 * 3600},
},
)
self.update_from_dict(entries)
def schedules_equal(self, *args, **kwargs):
"""重写父函数"""
if self._heap_invalidated:
self._heap_invalidated = False
return False
return super().schedules_equal(*args, **kwargs)
@property
def schedule(self) -> dict[str, ModelEntry]:
"""获取任务调度"""
initial = update = False
if self._initial_read:
logger.debug('DatabaseScheduler: initial read')
initial = update = True
self._initial_read = False
elif self.schedule_changed():
logger.info('DatabaseScheduler: Schedule changed.')
update = True
if update:
logger.debug('beat: Synchronizing schedule...')
self.sync()
self._schedule = run_await(self.get_all_task_schedulers)()
# 计划已更改,使 Scheduler.tick 中的堆无效
if not initial:
self._heap = []
self._heap_invalidated = True
logger.debug(
'Current schedule:\n%s',
'\n'.join(repr(entry) for entry in self._schedule.values()),
)
# logger.debug(self._schedule)
return self._schedule

View File

@ -0,0 +1,86 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from datetime import datetime
from typing import Literal
from celery import schedules
from celery.schedules import ParseException, crontab_parser
from backend.common.exception import errors
from backend.utils.timezone import timezone
class TzAwareCrontab(schedules.crontab):
"""时区感知 Crontab"""
def __init__(self, minute='*', hour='*', day_of_week='*', day_of_month='*', month_of_year='*', app=None):
super().__init__(
minute=minute,
hour=hour,
day_of_week=day_of_week,
day_of_month=day_of_month,
month_of_year=month_of_year,
nowfun=timezone.now,
app=app,
)
def is_due(self, last_run_at: datetime) -> tuple[bool, int | float]:
"""
任务到期状态
:param last_run_at: 最后运行时间
:return:
"""
rem_delta = self.remaining_estimate(last_run_at)
rem = max(rem_delta.total_seconds(), 0)
due = rem == 0
if due:
rem_delta = self.remaining_estimate(self.now())
rem = max(rem_delta.total_seconds(), 0)
return schedules.schedstate(is_due=due, next=rem)
def __reduce__(self) -> tuple[type, tuple[str, str, str, str, str], None]:
return (
self.__class__,
(
self._orig_minute,
self._orig_hour,
self._orig_day_of_week,
self._orig_day_of_month,
self._orig_month_of_year,
),
None,
)
def crontab_verify(filed: Literal['m', 'h', 'dow', 'dom', 'moy'], value: str, raise_exc: bool = True) -> bool:
"""
验证 Celery crontab 表达式
:param filed: 验证的字段
:param value: 验证的值
:param raise_exc: 是否抛出异常
:return:
"""
valid = True
try:
match filed:
case 'm':
crontab_parser(60, 0).parse(value)
case 'h':
crontab_parser(24, 0).parse(value)
case 'dow':
crontab_parser(7, 0).parse(value)
case 'dom':
crontab_parser(31, 1).parse(value)
case 'moy':
crontab_parser(12, 1).parse(value)
case _:
raise errors.ServerError(msg=f'无效字段:{filed}')
except ParseException:
valid = False
if raise_exc:
raise errors.RequestError(msg=f'crontab 值 {value} 非法')
return valid

View File

@ -1,7 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import asyncio
import os
from dataclasses import dataclass
from typing import Annotated
@ -12,12 +11,12 @@ import granian
from rich.panel import Panel
from rich.text import Text
from sqlalchemy import text
from watchfiles import PythonFilter
from backend import console, get_version
from backend.common.enums import DataBaseType, PrimaryKeyType
from backend.common.exception.errors import BaseExceptionMixin
from backend.core.conf import settings
from backend.core.path_conf import BASE_PATH
from backend.database.db import async_db_session
from backend.plugin.tools import get_plugin_sql
from backend.utils.file_ops import install_git_plugin, install_zip_plugin, parse_sql_script
@ -45,10 +44,7 @@ def run(host: str, port: int, reload: bool, workers: int | None) -> None:
address=host,
port=port,
reload=not reload,
reload_ignore_paths=[
os.path.join(BASE_PATH.parent / '.venv'),
os.path.join(BASE_PATH / 'log'),
],
reload_filter=PythonFilter(),
workers=workers or 1,
).serve()

View File

@ -3,7 +3,6 @@
from functools import lru_cache
from typing import Any, Literal
from celery.schedules import crontab
from pydantic import model_validator
from pydantic_settings import BaseSettings, SettingsConfigDict
@ -200,7 +199,6 @@ class Settings(BaseSettings):
# App Task
# .env Redis
CELERY_BROKER_REDIS_DATABASE: int
CELERY_BACKEND_REDIS_DATABASE: int
# .env RabbitMQ
# docker run -d --hostname fba-mq --name fba-mq -p 5672:5672 -p 15672:15672 rabbitmq:latest
@ -211,30 +209,9 @@ class Settings(BaseSettings):
# 基础配置
CELERY_BROKER: Literal['rabbitmq', 'redis'] = 'redis'
CELERY_BACKEND_REDIS_PREFIX: str = 'fba:celery:'
CELERY_BACKEND_REDIS_TIMEOUT: int = 5
CELERY_TASK_PACKAGES: list[str] = [
'app.task.celery_task',
'app.task.celery_task.db_log',
]
CELERY_REDIS_PREFIX: str = 'fba:celery'
CELERY_TASK_MAX_RETRIES: int = 5
# 定时任务配置
CELERY_SCHEDULE: dict[str, dict[str, Any]] = {
'exec-every-10-seconds': {
'task': 'task_demo_async',
'schedule': 10,
},
'exec-every-sunday': {
'task': 'delete_db_opera_log',
'schedule': crontab('0', '0', day_of_week='6'),
},
'exec-every-15-of-month': {
'task': 'delete_db_login_log',
'schedule': crontab('0', '0', day_of_month='15'),
},
}
# Plugin Code Generator
CODE_GENERATOR_DOWNLOAD_ZIP_FILENAME: str = 'fba_generator'

View File

@ -20,12 +20,10 @@ class RedisCli(Redis):
password=settings.REDIS_PASSWORD,
db=settings.REDIS_DATABASE,
socket_timeout=settings.REDIS_TIMEOUT,
socket_connect_timeout=5, # 连接超时
socket_connect_timeout=settings.REDIS_TIMEOUT,
socket_keepalive=True, # 保持连接
health_check_interval=30, # 健康检查间隔
decode_responses=True, # 转码 utf-8
retry_on_timeout=True, # 超时重试
max_connections=20, # 最大连接数
)
async def open(self) -> None:

View File

@ -24,7 +24,6 @@ OAUTH2_LINUX_DO_CLIENT_SECRET='test'
# Task
# Celery
CELERY_BROKER_REDIS_DATABASE=1
CELERY_BACKEND_REDIS_DATABASE=2
# Rabbitmq
CELERY_RABBITMQ_HOST='fba_rabbitmq'
CELERY_RABBITMQ_PORT=5672

View File

@ -13,44 +13,45 @@ requires-python = ">=3.10"
dynamic = ['version']
dependencies = [
"aiofiles>=24.1.0",
"alembic>=1.14.1",
"alembic>=1.16.3",
"asgi-correlation-id>=4.3.3",
"asgiref>=3.8.0",
"asgiref>=3.9.0",
"asyncmy>=0.2.10",
"asyncpg>=0.30.0",
"bcrypt>=4.2.1",
"cappa>=0.28.0",
"celery==5.3.6",
"celery>=5.5.3",
# When celery version < 6.0.0
# https://github.com/celery/celery/issues/7874
"celery-aio-pool==0.1.0rc8",
"cryptography>=44.0.0",
"dulwich>=0.22.8",
"celery-aio-pool>=0.1.0rc8",
"cryptography>=45.0.5",
"dulwich>=0.23.2",
"fast-captcha>=0.3.2",
"fastapi-limiter>=0.1.6",
"fastapi-pagination>=0.13.0",
"fastapi[standard]==0.115.11",
"fastapi[standard-no-fastapi-cloud-cli]>=0.116.0",
"flower>=2.0.0",
"gevent>=24.11.1",
"gevent>=25.5.1",
"granian>=2.4.0",
"ip2loc>=1.0.0",
"itsdangerous>=2.2.0",
"jinja2>=3.1.4",
"loguru>=0.7.3",
"msgspec>=0.19.0",
"path==17.0.0",
"psutil>=6.0.0",
"path>=17.0.0",
"psutil>=7.0.0",
"pwdlib>=0.2.1",
"pydantic>=2.11.0",
"pydantic-settings>=2.0.0",
"python-jose>=3.3.0",
"pydantic-settings>=2.10.0",
"pymysql>=1.1.1",
"python-jose>=3.5.0",
"python-socketio>=5.12.0",
"redis[hiredis]>=5.2.0",
"redis[hiredis]>=6.2.0",
"rtoml>=0.12.0",
"sqlalchemy-crud-plus>=1.10.0",
"sqlalchemy[asyncio]>=2.0.40",
"sqlparse>=0.5.3",
"user-agents==2.2.0",
"user-agents>=2.2.0",
]
[dependency-groups]

View File

@ -3,7 +3,7 @@
-e .
aiofiles==24.1.0
# via fastapi-best-architecture
alembic==1.15.1
alembic==1.16.3
# via fastapi-best-architecture
amqp==5.3.1
# via kombu
@ -16,7 +16,7 @@ anyio==4.9.0
# watchfiles
asgi-correlation-id==4.3.4
# via fastapi-best-architecture
asgiref==3.8.1
asgiref==3.9.1
# via fastapi-best-architecture
async-timeout==5.0.1 ; python_full_version < '3.11.3'
# via
@ -32,16 +32,16 @@ bidict==0.23.1
# via python-socketio
billiard==4.2.1
# via celery
cappa==0.28.0
cappa==0.28.1
# via fastapi-best-architecture
celery==5.3.6
celery==5.5.3
# via
# celery-aio-pool
# fastapi-best-architecture
# flower
celery-aio-pool==0.1.0rc8
# via fastapi-best-architecture
certifi==2025.1.31
certifi==2025.7.9
# via
# httpcore
# httpx
@ -51,18 +51,19 @@ cffi==1.17.1 ; platform_python_implementation != 'PyPy'
# gevent
cfgv==3.4.0
# via pre-commit
click==8.1.8
click==8.2.1
# via
# celery
# click-didyoumean
# click-plugins
# click-repl
# granian
# rich-toolkit
# typer
# uvicorn
click-didyoumean==0.3.1
# via celery
click-plugins==1.1.1
click-plugins==1.1.1.2
# via celery
click-repl==0.3.0
# via celery
@ -72,63 +73,63 @@ colorama==0.4.6 ; sys_platform == 'win32'
# loguru
# pytest
# uvicorn
cryptography==44.0.2
cryptography==45.0.5
# via fastapi-best-architecture
distlib==0.3.9
# via virtualenv
dnspython==2.7.0
# via email-validator
dulwich==0.22.8
dulwich==0.23.2
# via fastapi-best-architecture
ecdsa==0.19.1
# via python-jose
email-validator==2.2.0
# via fastapi
exceptiongroup==1.2.2 ; python_full_version < '3.11'
exceptiongroup==1.3.0 ; python_full_version < '3.11'
# via
# anyio
# pytest
fast-captcha==0.3.2
# via fastapi-best-architecture
fastapi==0.115.11
fastapi==0.116.0
# via
# fastapi-best-architecture
# fastapi-limiter
# fastapi-pagination
fastapi-cli==0.0.5
fastapi-cli==0.0.8
# via fastapi
fastapi-limiter==0.1.6
# via fastapi-best-architecture
fastapi-pagination==0.13.0
fastapi-pagination==0.13.3
# via fastapi-best-architecture
filelock==3.18.0
# via virtualenv
flower==2.0.1
# via fastapi-best-architecture
gevent==24.11.1
gevent==25.5.1
# via fastapi-best-architecture
granian==2.4.0
granian==2.4.1
# via fastapi-best-architecture
greenlet==3.1.1
greenlet==3.2.3
# via
# gevent
# sqlalchemy
h11==0.14.0
h11==0.16.0
# via
# httpcore
# uvicorn
# wsproto
hiredis==3.1.0
hiredis==3.2.1
# via redis
httpcore==1.0.7
httpcore==1.0.9
# via httpx
httptools==0.6.4
# via uvicorn
httpx==0.28.1
# via fastapi
humanize==4.12.2
humanize==4.12.3
# via flower
identify==2.6.9
identify==2.6.12
# via pre-commit
idna==3.10
# via
@ -145,11 +146,11 @@ jinja2==3.1.6
# via
# fastapi
# fastapi-best-architecture
kombu==5.5.1
kombu==5.5.4
# via celery
loguru==0.7.3
# via fastapi-best-architecture
mako==1.3.9
mako==1.3.10
# via alembic
markdown-it-py==3.0.0
# via rich
@ -163,63 +164,68 @@ msgspec==0.19.0
# via fastapi-best-architecture
nodeenv==1.9.1
# via pre-commit
packaging==24.2
packaging==25.0
# via
# asgi-correlation-id
# kombu
# pytest
# pytest-sugar
path==17.0.0
path==17.1.0
# via fastapi-best-architecture
pillow==11.1.0
pillow==11.3.0
# via fast-captcha
platformdirs==4.3.7
platformdirs==4.3.8
# via virtualenv
pluggy==1.5.0
pluggy==1.6.0
# via pytest
pre-commit==4.2.0
prometheus-client==0.21.1
prometheus-client==0.22.1
# via flower
prompt-toolkit==3.0.50
prompt-toolkit==3.0.51
# via click-repl
psutil==7.0.0
# via fastapi-best-architecture
pwdlib==0.2.1
# via fastapi-best-architecture
pyasn1==0.4.8
pyasn1==0.6.1
# via
# python-jose
# rsa
pycparser==2.22 ; platform_python_implementation != 'PyPy'
# via cffi
pydantic==2.11.0
pydantic==2.11.7
# via
# fastapi
# fastapi-best-architecture
# fastapi-pagination
# pydantic-settings
# sqlalchemy-crud-plus
pydantic-core==2.33.0
pydantic-core==2.33.2
# via pydantic
pydantic-settings==2.8.1
pydantic-settings==2.10.1
# via fastapi-best-architecture
pygments==2.19.1
# via rich
pytest==8.3.5
pygments==2.19.2
# via
# pytest
# rich
pymysql==1.1.1
# via fastapi-best-architecture
pytest==8.4.1
# via pytest-sugar
pytest-sugar==1.0.0
python-dateutil==2.9.0.post0
# via celery
python-dotenv==1.1.0
python-dotenv==1.1.1
# via
# pydantic-settings
# uvicorn
python-engineio==4.11.2
python-engineio==4.12.2
# via python-socketio
python-jose==3.4.0
python-jose==3.5.0
# via fastapi-best-architecture
python-multipart==0.0.20
# via fastapi
python-socketio==5.12.1
python-socketio==5.13.0
# via fastapi-best-architecture
pytz==2025.2
# via flower
@ -227,19 +233,22 @@ pyyaml==6.0.2
# via
# pre-commit
# uvicorn
redis==5.2.1
redis==6.2.0
# via
# fastapi-best-architecture
# fastapi-limiter
rich==13.9.4
rich==14.0.0
# via
# cappa
# rich-toolkit
# typer
rsa==4.9
rich-toolkit==0.14.8
# via fastapi-cli
rsa==4.9.1
# via python-jose
rtoml==0.12.0
# via fastapi-best-architecture
setuptools==78.1.0
setuptools==80.9.0
# via
# zope-event
# zope-interface
@ -253,7 +262,7 @@ six==1.17.0
# python-dateutil
sniffio==1.3.1
# via anyio
sqlalchemy==2.0.40
sqlalchemy==2.0.41
# via
# alembic
# fastapi-best-architecture
@ -262,51 +271,55 @@ sqlalchemy-crud-plus==1.10.0
# via fastapi-best-architecture
sqlparse==0.5.3
# via fastapi-best-architecture
starlette==0.46.1
starlette==0.46.2
# via
# asgi-correlation-id
# fastapi
termcolor==2.5.0
termcolor==3.1.0
# via pytest-sugar
tomli==2.2.1 ; python_full_version < '3.11'
# via pytest
tornado==6.4.2
# via
# alembic
# pytest
tornado==6.5.1
# via flower
type-lens==0.2.3
# via cappa
typer==0.15.2
typer==0.16.0
# via fastapi-cli
typing-extensions==4.13.0
typing-extensions==4.14.1
# via
# alembic
# anyio
# asgiref
# cappa
# exceptiongroup
# fastapi
# fastapi-pagination
# pydantic
# pydantic-core
# rich
# rich-toolkit
# sqlalchemy
# type-lens
# typer
# typing-inspection
# uvicorn
typing-inspection==0.4.0
# via pydantic
tzdata==2025.1
typing-inspection==0.4.1
# via
# celery
# kombu
# pydantic
# pydantic-settings
tzdata==2025.2
# via kombu
ua-parser==1.0.1
# via user-agents
ua-parser-builtins==0.18.0.post1
# via ua-parser
urllib3==2.4.0
urllib3==2.5.0
# via dulwich
user-agents==2.2.0
# via fastapi-best-architecture
uvicorn==0.34.0
uvicorn==0.35.0
# via
# fastapi
# fastapi-cli
@ -317,9 +330,9 @@ vine==5.1.0
# amqp
# celery
# kombu
virtualenv==20.29.3
virtualenv==20.31.2
# via pre-commit
watchfiles==1.0.4
watchfiles==1.1.0
# via uvicorn
wcwidth==0.2.13
# via prompt-toolkit
@ -329,7 +342,7 @@ win32-setctime==1.2.0 ; sys_platform == 'win32'
# via loguru
wsproto==1.2.0
# via simple-websocket
zope-event==5.0
zope-event==5.1
# via gevent
zope-interface==7.2
# via gevent

1992
uv.lock generated

File diff suppressed because it is too large Load Diff