mirror of
https://github.com/fastapi-practices/fastapi_best_architecture.git
synced 2026-03-13 09:31:31 +08:00
Optimize dynamic config loading implementation (#1022)
This commit is contained in:
@@ -1,19 +1,19 @@
|
||||
from collections.abc import Callable
|
||||
|
||||
from sqlalchemy import inspect
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from backend.core.conf import settings
|
||||
from backend.database.db import async_engine
|
||||
from backend.plugin.config.crud.crud_config import config_dao
|
||||
from backend.plugin.config.enums import ConfigType
|
||||
from backend.utils.serializers import select_list_serialize
|
||||
|
||||
_sys_config_table_exists: bool | None = None
|
||||
|
||||
|
||||
async def check_sys_config_table_exists() -> bool:
|
||||
"""
|
||||
检查 sys_config 表是否存在
|
||||
|
||||
:return:
|
||||
"""
|
||||
"""检查 sys_config 表是否存在"""
|
||||
global _sys_config_table_exists
|
||||
if _sys_config_table_exists is None:
|
||||
async with async_engine.begin() as conn:
|
||||
@@ -21,50 +21,60 @@ async def check_sys_config_table_exists() -> bool:
|
||||
return _sys_config_table_exists
|
||||
|
||||
|
||||
async def load_user_security_config(db: AsyncSession) -> None: # noqa: C901
|
||||
def _to_bool(value: str) -> bool:
|
||||
"""将字符串转换为布尔值"""
|
||||
return value == 'true'
|
||||
|
||||
|
||||
async def _load_config(
|
||||
db: AsyncSession,
|
||||
config_type: ConfigType,
|
||||
mapping: dict[str, Callable],
|
||||
status_key: str,
|
||||
) -> None:
|
||||
"""
|
||||
根据配置类型加载配置
|
||||
|
||||
:param db: 数据库会话
|
||||
:param config_type: 配置类型枚举
|
||||
:param mapping: 配置映射 {config_key: converter}
|
||||
:param status_key: 状态键
|
||||
:return:
|
||||
"""
|
||||
if not await check_sys_config_table_exists():
|
||||
return
|
||||
|
||||
dynamic_config = await config_dao.get_all(db, config_type)
|
||||
if not dynamic_config:
|
||||
return
|
||||
|
||||
configs = {dc['key']: dc['value'] for dc in select_list_serialize(dynamic_config)}
|
||||
if configs.get(status_key, '1') == '0':
|
||||
return
|
||||
|
||||
for config_key, converter in mapping.items():
|
||||
if config_key in configs:
|
||||
setattr(settings, config_key, converter(configs[config_key]))
|
||||
|
||||
|
||||
async def load_user_security_config(db: AsyncSession) -> None:
|
||||
"""
|
||||
获取用户安全配置
|
||||
|
||||
:param db: 数据库会话
|
||||
:return:
|
||||
"""
|
||||
if not await check_sys_config_table_exists():
|
||||
return
|
||||
|
||||
from backend.plugin.config.crud.crud_config import config_dao
|
||||
from backend.plugin.config.enums import ConfigType
|
||||
|
||||
dynamic_config = await config_dao.get_all(db, ConfigType.user_security)
|
||||
|
||||
if dynamic_config:
|
||||
security_config_status_key = 'USER_SECURITY_CONFIG_STATUS'
|
||||
lock_threshold_key = 'USER_LOCK_THRESHOLD'
|
||||
lock_seconds_key = 'USER_LOCK_SECONDS'
|
||||
password_expiry_days_key = 'USER_PASSWORD_EXPIRY_DAYS'
|
||||
password_reminder_days_key = 'USER_PASSWORD_REMINDER_DAYS'
|
||||
password_history_check_count_key = 'USER_PASSWORD_HISTORY_CHECK_COUNT'
|
||||
password_min_length_key = 'USER_PASSWORD_MIN_LENGTH'
|
||||
password_max_length_key = 'USER_PASSWORD_MAX_LENGTH'
|
||||
password_require_special_char_key = 'USER_PASSWORD_REQUIRE_SPECIAL_CHAR'
|
||||
|
||||
configs = {dc['key']: dc['value'] for dc in select_list_serialize(dynamic_config)}
|
||||
if int(configs.get(security_config_status_key)):
|
||||
if lock_threshold_key in configs:
|
||||
settings.USER_LOCK_THRESHOLD = int(configs[lock_threshold_key])
|
||||
if lock_seconds_key in configs:
|
||||
settings.USER_LOCK_SECONDS = int(configs[lock_seconds_key])
|
||||
if password_expiry_days_key in configs:
|
||||
settings.USER_PASSWORD_EXPIRY_DAYS = int(configs[password_expiry_days_key])
|
||||
if password_reminder_days_key in configs:
|
||||
settings.USER_PASSWORD_REMINDER_DAYS = int(configs[password_reminder_days_key])
|
||||
if password_history_check_count_key in configs:
|
||||
settings.USER_PASSWORD_HISTORY_CHECK_COUNT = int(configs[password_history_check_count_key])
|
||||
if password_min_length_key in configs:
|
||||
settings.USER_PASSWORD_MIN_LENGTH = int(configs[password_min_length_key])
|
||||
if password_max_length_key in configs:
|
||||
settings.USER_PASSWORD_MAX_LENGTH = int(configs[password_max_length_key])
|
||||
if password_require_special_char_key in configs:
|
||||
settings.USER_PASSWORD_REQUIRE_SPECIAL_CHAR = configs[password_require_special_char_key] == 'true'
|
||||
mapping = {
|
||||
'USER_LOCK_THRESHOLD': int,
|
||||
'USER_LOCK_SECONDS': int,
|
||||
'USER_PASSWORD_EXPIRY_DAYS': int,
|
||||
'USER_PASSWORD_REMINDER_DAYS': int,
|
||||
'USER_PASSWORD_HISTORY_CHECK_COUNT': int,
|
||||
'USER_PASSWORD_MIN_LENGTH': int,
|
||||
'USER_PASSWORD_MAX_LENGTH': int,
|
||||
'USER_PASSWORD_REQUIRE_SPECIAL_CHAR': _to_bool,
|
||||
}
|
||||
await _load_config(db, ConfigType.user_security, mapping, 'USER_SECURITY_CONFIG_STATUS')
|
||||
|
||||
|
||||
async def load_login_config(db: AsyncSession) -> None:
|
||||
@@ -74,21 +84,10 @@ async def load_login_config(db: AsyncSession) -> None:
|
||||
:param db: 数据库会话
|
||||
:return:
|
||||
"""
|
||||
if not await check_sys_config_table_exists():
|
||||
return
|
||||
|
||||
from backend.plugin.config.crud.crud_config import config_dao
|
||||
from backend.plugin.config.enums import ConfigType
|
||||
|
||||
dynamic_config = await config_dao.get_all(db, ConfigType.login)
|
||||
|
||||
if dynamic_config:
|
||||
login_config_status_key = 'LOGIN_CONFIG_STATUS'
|
||||
login_captcha_enabled_key = 'LOGIN_CAPTCHA_ENABLED'
|
||||
|
||||
configs = {dc['key']: dc['value'] for dc in select_list_serialize(dynamic_config)}
|
||||
if int(configs.get(login_config_status_key)) and login_captcha_enabled_key in configs:
|
||||
settings.LOGIN_CAPTCHA_ENABLED = configs[login_captcha_enabled_key] == 'true'
|
||||
mapping = {
|
||||
'LOGIN_CAPTCHA_ENABLED': _to_bool,
|
||||
}
|
||||
await _load_config(db, ConfigType.login, mapping, 'LOGIN_CONFIG_STATUS')
|
||||
|
||||
|
||||
async def load_email_config(db: AsyncSession) -> None:
|
||||
@@ -98,30 +97,11 @@ async def load_email_config(db: AsyncSession) -> None:
|
||||
:param db: 数据库会话
|
||||
:return:
|
||||
"""
|
||||
if not await check_sys_config_table_exists():
|
||||
return
|
||||
|
||||
from backend.plugin.config.crud.crud_config import config_dao
|
||||
from backend.plugin.config.enums import ConfigType
|
||||
|
||||
dynamic_config = await config_dao.get_all(db, ConfigType.email)
|
||||
|
||||
if dynamic_config:
|
||||
email_config_status_key = 'EMAIL_CONFIG_STATUS'
|
||||
host_key = 'EMAIL_HOST'
|
||||
port_key = 'EMAIL_PORT'
|
||||
ssl_key = 'EMAIL_SSL'
|
||||
username_key = 'EMAIL_USERNAME'
|
||||
password_key = 'EMAIL_PASSWORD'
|
||||
|
||||
configs = {dc['key']: dc['value'] for dc in select_list_serialize(dynamic_config)}
|
||||
if int(configs.get(email_config_status_key)):
|
||||
settings.EMAIL_HOST = str(configs[host_key])
|
||||
if configs.get(port_key):
|
||||
settings.EMAIL_PORT = int(configs[port_key])
|
||||
if configs.get(ssl_key):
|
||||
settings.EMAIL_SSL = configs[ssl_key] == 'true'
|
||||
if configs.get(username_key):
|
||||
settings.EMAIL_USERNAME = str(configs[username_key])
|
||||
if configs.get(password_key):
|
||||
settings.EMAIL_PASSWORD = str(configs[password_key])
|
||||
mapping = {
|
||||
'EMAIL_HOST': str,
|
||||
'EMAIL_PORT': int,
|
||||
'EMAIL_SSL': _to_bool,
|
||||
'EMAIL_USERNAME': str,
|
||||
'EMAIL_PASSWORD': str,
|
||||
}
|
||||
await _load_config(db, ConfigType.email, mapping, 'EMAIL_CONFIG_STATUS')
|
||||
|
||||
Reference in New Issue
Block a user