From eb9399e14f322367c70b47651ba2cabcdbeb46a5 Mon Sep 17 00:00:00 2001 From: Wu Clan Date: Mon, 16 Feb 2026 14:37:56 +0800 Subject: [PATCH] Fix task issues and update crontab expressions (#1080) --- backend/app/task/api/v1/scheduler.py | 2 +- backend/app/task/model/scheduler.py | 2 +- backend/app/task/schema/scheduler.py | 2 +- backend/app/task/service/scheduler_service.py | 2 +- backend/app/task/utils/schedulers.py | 52 +++++++------------ backend/app/task/utils/tzcrontab.py | 44 +++------------- 6 files changed, 29 insertions(+), 75 deletions(-) diff --git a/backend/app/task/api/v1/scheduler.py b/backend/app/task/api/v1/scheduler.py index 7dd94ead..aa48d9ca 100644 --- a/backend/app/task/api/v1/scheduler.py +++ b/backend/app/task/api/v1/scheduler.py @@ -43,7 +43,7 @@ async def get_task_scheduler( ) async def get_task_scheduler_paginated( db: CurrentSession, - name: Annotated[int | None, Path(description='任务调度名称')] = None, + name: Annotated[str | None, Query(description='任务调度名称')] = None, type: Annotated[int | None, Query(description='任务调度类型')] = None, ) -> ResponseSchemaModel[PageData[GetTaskSchedulerDetail]]: page_data = await task_scheduler_service.get_list(db=db, name=name, type=type) diff --git a/backend/app/task/model/scheduler.py b/backend/app/task/model/scheduler.py index 77f3374a..7495b0dd 100644 --- a/backend/app/task/model/scheduler.py +++ b/backend/app/task/model/scheduler.py @@ -33,7 +33,7 @@ class TaskScheduler(Base): type: Mapped[int] = mapped_column(comment='调度类型(0间隔 1定时)') interval_every: Mapped[int | None] = mapped_column(comment='任务再次运行前的间隔周期数') interval_period: Mapped[str | None] = mapped_column(sa.String(256), comment='任务运行之间的周期类型') - crontab: Mapped[str | None] = mapped_column(sa.String(64), default='* * * * *', comment='任务运行的 Crontab 计划') + crontab: Mapped[str | None] = mapped_column(sa.String(64), default='* * * * *', comment='Crontab 表达式') one_off: Mapped[bool] = mapped_column(default=False, comment='是否仅运行一次') enabled: Mapped[bool] = mapped_column(default=True, comment='是否启用任务') total_run_count: Mapped[int] = mapped_column(default=0, comment='任务触发的总次数') diff --git a/backend/app/task/schema/scheduler.py b/backend/app/task/schema/scheduler.py index c1f60bc1..d6ac38af 100644 --- a/backend/app/task/schema/scheduler.py +++ b/backend/app/task/schema/scheduler.py @@ -23,7 +23,7 @@ class TaskSchedulerSchemeBase(SchemaBase): type: TaskSchedulerType = Field(description='任务调度类型(0间隔 1定时)') interval_every: int | None = Field(default=None, description='任务再次运行前的间隔周期数') interval_period: PeriodType | None = Field(default=None, description='任务运行之间的周期类型') - crontab: str = Field(default='* * * * *', description='运行的 Crontab 表达式') + crontab: str = Field(default='* * * * *', description='Crontab 表达式') one_off: bool = Field(default=False, description='是否仅运行一次') remark: str | None = Field(default=None, description='备注') diff --git a/backend/app/task/service/scheduler_service.py b/backend/app/task/service/scheduler_service.py index 85276a3e..68b5fb32 100644 --- a/backend/app/task/service/scheduler_service.py +++ b/backend/app/task/service/scheduler_service.py @@ -92,7 +92,7 @@ class TaskSchedulerService: raise errors.NotFoundError(msg='任务调度不存在') if task_scheduler.name != obj.name and await task_scheduler_dao.get_by_name(db, obj.name): raise errors.ConflictError(msg='任务调度已存在') - if task_scheduler.type == TaskSchedulerType.CRONTAB: + if obj.type == TaskSchedulerType.CRONTAB: crontab_verify(obj.crontab) count = await task_scheduler_dao.update(db, pk, obj) return count diff --git a/backend/app/task/utils/schedulers.py b/backend/app/task/utils/schedulers.py index a4152f9d..1aca18ad 100644 --- a/backend/app/task/utils/schedulers.py +++ b/backend/app/task/utils/schedulers.py @@ -56,14 +56,7 @@ class ModelEntry(ScheduleEntry): ): self.schedule = schedules.schedule(timedelta(**{model.interval_period: model.interval_every})) elif model.type == TaskSchedulerType.CRONTAB and model.crontab is not None: - crontab_split = model.crontab.split(' ') - self.schedule = TzAwareCrontab( - minute=crontab_split[0], - hour=crontab_split[1], - day_of_week=crontab_split[2], - day_of_month=crontab_split[3], - month_of_year=crontab_split[4], - ) + self.schedule = TzAwareCrontab.from_string(model.crontab) else: raise errors.NotFoundError(msg=f'{self.name} 计划为空!') # logger.debug('Schedule: {}'.format(self.schedule)) @@ -85,12 +78,10 @@ class ModelEntry(ScheduleEntry): continue self.options[option] = value - expires = getattr(model, 'expires_', None) - if expires: - if isinstance(expires, int): - self.options['expires'] = expires - elif isinstance(expires, datetime): - self.options['expires'] = timezone.from_datetime(expires) + if model.expire_seconds is not None: + self.options['expires'] = model.expire_seconds + elif model.expire_time is not None: + self.options['expires'] = timezone.from_datetime(model.expire_time) if not model.last_run_time: model.last_run_time = timezone.now() @@ -105,10 +96,15 @@ class ModelEntry(ScheduleEntry): """禁用任务""" model.no_changes = True self.model.enabled = self.enabled = model.enabled = False - async with async_db_session.begin(): - model.enabled = False + async with async_db_session.begin() as db: + stmt = select(TaskScheduler).where(TaskScheduler.id == model.id) + query = await db.execute(stmt) + task = query.scalars().first() + if task: + task.no_changes = True + task.enabled = False - def is_due(self) -> tuple[bool, int | float]: + def is_due(self) -> tuple[bool, int | float | datetime]: """任务到期状态""" if not self.model.enabled: # 重新启用时延迟 5 秒 @@ -196,7 +192,7 @@ class ModelEntry(ScheduleEntry): if not obj: obj = TaskScheduler(**CreateTaskSchedulerParam(task=task, **spec).model_dump()) elif isinstance(schedule, schedules.crontab): - crontab = f'{schedule._orig_minute} {schedule._orig_hour} {schedule._orig_day_of_week} {schedule._orig_day_of_month} {schedule._orig_month_of_year}' # noqa: E501 + crontab = f'{schedule._orig_minute} {schedule._orig_hour} {schedule._orig_day_of_month} {schedule._orig_month_of_year} {schedule._orig_day_of_week}' # noqa: E501 crontab_verify(crontab) spec = { 'name': name, @@ -256,7 +252,7 @@ class ModelEntry(ScheduleEntry): 'exchange': exchange, 'routing_key': routing_key, 'start_time': start_time, - 'expire_time': expires, + 'expire_time': None, 'expire_seconds': expire_seconds, 'one_off': one_off, } @@ -265,6 +261,8 @@ class ModelEntry(ScheduleEntry): data['expire_seconds'] = expires elif isinstance(expires, timedelta): data['expire_time'] = timezone.now() + expires + elif isinstance(expires, datetime): + data['expire_time'] = expires return data @@ -288,20 +286,6 @@ class DatabaseScheduler(Scheduler): self._finalize = Finalize(self, self.sync, exitpriority=5) self.max_interval = kwargs.get('max_interval') or self.app.conf.beat_max_loop_interval or DEFAULT_MAX_INTERVAL - def install_default_entries(self, data) -> None: # noqa: ANN001 - """重写父函数""" - entries = {} - if self.app.conf.result_expires: - entries.setdefault( - 'celery.backend_cleanup', - { - 'task': 'celery.backend_cleanup', - 'schedule': schedules.crontab('0', '4', '*'), - 'options': {'expire_seconds': 12 * 3600}, - }, - ) - self.update_from_dict(entries) - def schedules_equal(self, *args, **kwargs) -> bool: """重写父函数""" if self._heap_invalidated: @@ -367,7 +351,7 @@ class DatabaseScheduler(Scheduler): def update_from_dict(self, beat_dict: dict) -> None: """重写父函数""" s = {} - + name = None try: for name, entry_fields in beat_dict.items(): entry = run_await(self.Entry.from_entry)(name, app=self.app, **entry_fields) diff --git a/backend/app/task/utils/tzcrontab.py b/backend/app/task/utils/tzcrontab.py index c3bbb763..ddd9a341 100644 --- a/backend/app/task/utils/tzcrontab.py +++ b/backend/app/task/utils/tzcrontab.py @@ -1,7 +1,5 @@ -from datetime import datetime - from celery import schedules -from celery.schedules import ParseException, crontab +from celery.schedules import ParseException from backend.common.exception import errors from backend.utils.timezone import timezone @@ -21,46 +19,18 @@ class TzAwareCrontab(schedules.crontab): app=app, ) - def is_due(self, last_run_at: datetime) -> tuple[bool, int | float]: - """ - 任务到期状态 - :param last_run_at: 最后运行时间 - :return: - """ - rem_delta = self.remaining_estimate(last_run_at) - rem = max(rem_delta.total_seconds(), 0) - due = rem == 0 - if due: - rem_delta = self.remaining_estimate(self.now()) - rem = max(rem_delta.total_seconds(), 0) - return schedules.schedstate(is_due=due, next=rem) - - def __reduce__(self) -> tuple[type, tuple[str, str, str, str, str], None]: - return ( - self.__class__, - ( - self._orig_minute, - self._orig_hour, - self._orig_day_of_week, - self._orig_day_of_month, - self._orig_month_of_year, - ), - None, - ) - - -def crontab_verify(crontab_str: str) -> None: +def crontab_verify(crontab: str) -> None: """ - 验证 Celery crontab 表达式 + 验证标准 crontab 表达式 - :param crontab_str: 计划表达式 + :param crontab: 标准 crontab 表达式 :return: """ - crontab_split = crontab_str.split(' ') + crontab_split = crontab.split(' ') if len(crontab_split) != 5: raise errors.RequestError(msg='Crontab 表达式非法') try: - crontab(*crontab_split) - except ParseException: + TzAwareCrontab.from_string(crontab) + except (ParseException, ValueError): raise errors.RequestError(msg='Crontab 表达式非法')