Fix task issues and update crontab expressions (#1080)

This commit is contained in:
Wu Clan
2026-02-16 14:37:56 +08:00
committed by GitHub
parent 0c9eafbbb9
commit eb9399e14f
6 changed files with 29 additions and 75 deletions

View File

@@ -43,7 +43,7 @@ async def get_task_scheduler(
)
async def get_task_scheduler_paginated(
db: CurrentSession,
name: Annotated[int | None, Path(description='任务调度名称')] = None,
name: Annotated[str | None, Query(description='任务调度名称')] = None,
type: Annotated[int | None, Query(description='任务调度类型')] = None,
) -> ResponseSchemaModel[PageData[GetTaskSchedulerDetail]]:
page_data = await task_scheduler_service.get_list(db=db, name=name, type=type)

View File

@@ -33,7 +33,7 @@ class TaskScheduler(Base):
type: Mapped[int] = mapped_column(comment='调度类型0间隔 1定时')
interval_every: Mapped[int | None] = mapped_column(comment='任务再次运行前的间隔周期数')
interval_period: Mapped[str | None] = mapped_column(sa.String(256), comment='任务运行之间的周期类型')
crontab: Mapped[str | None] = mapped_column(sa.String(64), default='* * * * *', comment='任务运行的 Crontab 计划')
crontab: Mapped[str | None] = mapped_column(sa.String(64), default='* * * * *', comment='Crontab 表达式')
one_off: Mapped[bool] = mapped_column(default=False, comment='是否仅运行一次')
enabled: Mapped[bool] = mapped_column(default=True, comment='是否启用任务')
total_run_count: Mapped[int] = mapped_column(default=0, comment='任务触发的总次数')

View File

@@ -23,7 +23,7 @@ class TaskSchedulerSchemeBase(SchemaBase):
type: TaskSchedulerType = Field(description='任务调度类型0间隔 1定时')
interval_every: int | None = Field(default=None, description='任务再次运行前的间隔周期数')
interval_period: PeriodType | None = Field(default=None, description='任务运行之间的周期类型')
crontab: str = Field(default='* * * * *', description='运行的 Crontab 表达式')
crontab: str = Field(default='* * * * *', description='Crontab 表达式')
one_off: bool = Field(default=False, description='是否仅运行一次')
remark: str | None = Field(default=None, description='备注')

View File

@@ -92,7 +92,7 @@ class TaskSchedulerService:
raise errors.NotFoundError(msg='任务调度不存在')
if task_scheduler.name != obj.name and await task_scheduler_dao.get_by_name(db, obj.name):
raise errors.ConflictError(msg='任务调度已存在')
if task_scheduler.type == TaskSchedulerType.CRONTAB:
if obj.type == TaskSchedulerType.CRONTAB:
crontab_verify(obj.crontab)
count = await task_scheduler_dao.update(db, pk, obj)
return count

View File

@@ -56,14 +56,7 @@ class ModelEntry(ScheduleEntry):
):
self.schedule = schedules.schedule(timedelta(**{model.interval_period: model.interval_every}))
elif model.type == TaskSchedulerType.CRONTAB and model.crontab is not None:
crontab_split = model.crontab.split(' ')
self.schedule = TzAwareCrontab(
minute=crontab_split[0],
hour=crontab_split[1],
day_of_week=crontab_split[2],
day_of_month=crontab_split[3],
month_of_year=crontab_split[4],
)
self.schedule = TzAwareCrontab.from_string(model.crontab)
else:
raise errors.NotFoundError(msg=f'{self.name} 计划为空!')
# logger.debug('Schedule: {}'.format(self.schedule))
@@ -85,12 +78,10 @@ class ModelEntry(ScheduleEntry):
continue
self.options[option] = value
expires = getattr(model, 'expires_', None)
if expires:
if isinstance(expires, int):
self.options['expires'] = expires
elif isinstance(expires, datetime):
self.options['expires'] = timezone.from_datetime(expires)
if model.expire_seconds is not None:
self.options['expires'] = model.expire_seconds
elif model.expire_time is not None:
self.options['expires'] = timezone.from_datetime(model.expire_time)
if not model.last_run_time:
model.last_run_time = timezone.now()
@@ -105,10 +96,15 @@ class ModelEntry(ScheduleEntry):
"""禁用任务"""
model.no_changes = True
self.model.enabled = self.enabled = model.enabled = False
async with async_db_session.begin():
model.enabled = False
async with async_db_session.begin() as db:
stmt = select(TaskScheduler).where(TaskScheduler.id == model.id)
query = await db.execute(stmt)
task = query.scalars().first()
if task:
task.no_changes = True
task.enabled = False
def is_due(self) -> tuple[bool, int | float]:
def is_due(self) -> tuple[bool, int | float | datetime]:
"""任务到期状态"""
if not self.model.enabled:
# 重新启用时延迟 5 秒
@@ -196,7 +192,7 @@ class ModelEntry(ScheduleEntry):
if not obj:
obj = TaskScheduler(**CreateTaskSchedulerParam(task=task, **spec).model_dump())
elif isinstance(schedule, schedules.crontab):
crontab = f'{schedule._orig_minute} {schedule._orig_hour} {schedule._orig_day_of_week} {schedule._orig_day_of_month} {schedule._orig_month_of_year}' # noqa: E501
crontab = f'{schedule._orig_minute} {schedule._orig_hour} {schedule._orig_day_of_month} {schedule._orig_month_of_year} {schedule._orig_day_of_week}' # noqa: E501
crontab_verify(crontab)
spec = {
'name': name,
@@ -256,7 +252,7 @@ class ModelEntry(ScheduleEntry):
'exchange': exchange,
'routing_key': routing_key,
'start_time': start_time,
'expire_time': expires,
'expire_time': None,
'expire_seconds': expire_seconds,
'one_off': one_off,
}
@@ -265,6 +261,8 @@ class ModelEntry(ScheduleEntry):
data['expire_seconds'] = expires
elif isinstance(expires, timedelta):
data['expire_time'] = timezone.now() + expires
elif isinstance(expires, datetime):
data['expire_time'] = expires
return data
@@ -288,20 +286,6 @@ class DatabaseScheduler(Scheduler):
self._finalize = Finalize(self, self.sync, exitpriority=5)
self.max_interval = kwargs.get('max_interval') or self.app.conf.beat_max_loop_interval or DEFAULT_MAX_INTERVAL
def install_default_entries(self, data) -> None: # noqa: ANN001
"""重写父函数"""
entries = {}
if self.app.conf.result_expires:
entries.setdefault(
'celery.backend_cleanup',
{
'task': 'celery.backend_cleanup',
'schedule': schedules.crontab('0', '4', '*'),
'options': {'expire_seconds': 12 * 3600},
},
)
self.update_from_dict(entries)
def schedules_equal(self, *args, **kwargs) -> bool:
"""重写父函数"""
if self._heap_invalidated:
@@ -367,7 +351,7 @@ class DatabaseScheduler(Scheduler):
def update_from_dict(self, beat_dict: dict) -> None:
"""重写父函数"""
s = {}
name = None
try:
for name, entry_fields in beat_dict.items():
entry = run_await(self.Entry.from_entry)(name, app=self.app, **entry_fields)

View File

@@ -1,7 +1,5 @@
from datetime import datetime
from celery import schedules
from celery.schedules import ParseException, crontab
from celery.schedules import ParseException
from backend.common.exception import errors
from backend.utils.timezone import timezone
@@ -21,46 +19,18 @@ class TzAwareCrontab(schedules.crontab):
app=app,
)
def is_due(self, last_run_at: datetime) -> tuple[bool, int | float]:
"""
任务到期状态
:param last_run_at: 最后运行时间
:return:
"""
rem_delta = self.remaining_estimate(last_run_at)
rem = max(rem_delta.total_seconds(), 0)
due = rem == 0
if due:
rem_delta = self.remaining_estimate(self.now())
rem = max(rem_delta.total_seconds(), 0)
return schedules.schedstate(is_due=due, next=rem)
def __reduce__(self) -> tuple[type, tuple[str, str, str, str, str], None]:
return (
self.__class__,
(
self._orig_minute,
self._orig_hour,
self._orig_day_of_week,
self._orig_day_of_month,
self._orig_month_of_year,
),
None,
)
def crontab_verify(crontab_str: str) -> None:
def crontab_verify(crontab: str) -> None:
"""
验证 Celery crontab 表达式
验证标准 crontab 表达式
:param crontab_str: 计划表达式
:param crontab: 标准 crontab 表达式
:return:
"""
crontab_split = crontab_str.split(' ')
crontab_split = crontab.split(' ')
if len(crontab_split) != 5:
raise errors.RequestError(msg='Crontab 表达式非法')
try:
crontab(*crontab_split)
except ParseException:
TzAwareCrontab.from_string(crontab)
except (ParseException, ValueError):
raise errors.RequestError(msg='Crontab 表达式非法')