Files
Wu Clan ce3be1db8e Add support for celery dynamic tasks (#715)
* Add support for celery dynamic tasks

* Update the celery conf

* Update the celery task tables name

* Refactor the celery task-related interfaces

* Optimize auto-discovery tasks

* Remove redundant config

* Refine the business codes

* Optimize crontab validation returns

* Update dependencies in pyproject toml

* Fix some bugs

* Update dependencies

* Update the version to 1.7.0

* Fix update and delete event
2025-07-11 07:54:33 +08:00

67 lines
2.0 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sys
from redis.asyncio import Redis
from redis.exceptions import AuthenticationError, TimeoutError
from backend.common.log import log
from backend.core.conf import settings
class RedisCli(Redis):
"""Redis 客户端"""
def __init__(self) -> None:
"""初始化 Redis 客户端"""
super(RedisCli, self).__init__(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
password=settings.REDIS_PASSWORD,
db=settings.REDIS_DATABASE,
socket_timeout=settings.REDIS_TIMEOUT,
socket_connect_timeout=settings.REDIS_TIMEOUT,
socket_keepalive=True, # 保持连接
health_check_interval=30, # 健康检查间隔
decode_responses=True, # 转码 utf-8
)
async def open(self) -> None:
"""触发初始化连接"""
try:
await self.ping()
except TimeoutError:
log.error('❌ 数据库 redis 连接超时')
sys.exit()
except AuthenticationError:
log.error('❌ 数据库 redis 连接认证失败')
sys.exit()
except Exception as e:
log.error('❌ 数据库 redis 连接异常 {}', e)
sys.exit()
async def delete_prefix(self, prefix: str, exclude: str | list[str] | None = None) -> None:
"""
删除指定前缀的所有 key
:param prefix: 前缀
:param exclude: 排除的 key
:return:
"""
keys = []
async for key in self.scan_iter(match=f'{prefix}*'):
if isinstance(exclude, str):
if key != exclude:
keys.append(key)
elif isinstance(exclude, list):
if key not in exclude:
keys.append(key)
else:
keys.append(key)
if keys:
await self.delete(*keys)
# 创建 redis 客户端单例
redis_client: RedisCli = RedisCli()