mirror of
https://github.com/fastapi-practices/fastapi_best_architecture.git
synced 2025-08-26 04:33:09 +08:00
add dictionary management interface (#127)
* add dictionary management interface * fix dictionary data one-to-many annotation
This commit is contained in:
@ -37,7 +37,7 @@ config.set_main_option('sqlalchemy.url', SQLALCHEMY_DATABASE_URL)
|
|||||||
|
|
||||||
|
|
||||||
def include_name(name, type_, parent_names):
|
def include_name(name, type_, parent_names):
|
||||||
if type_ == "table":
|
if type_ == 'table':
|
||||||
return name in target_metadata.tables
|
return name in target_metadata.tables
|
||||||
else:
|
else:
|
||||||
return True
|
return True
|
||||||
|
@ -13,6 +13,8 @@ from backend.app.api.v1.config import router as config_router
|
|||||||
from backend.app.api.v1.login_log import router as login_log_router
|
from backend.app.api.v1.login_log import router as login_log_router
|
||||||
from backend.app.api.v1.opera_log import router as opera_log_router
|
from backend.app.api.v1.opera_log import router as opera_log_router
|
||||||
from backend.app.api.v1.task_demo import router as task_demo_router
|
from backend.app.api.v1.task_demo import router as task_demo_router
|
||||||
|
from backend.app.api.v1.dict_type import router as dict_type_router
|
||||||
|
from backend.app.api.v1.dict_data import router as dict_data_router
|
||||||
|
|
||||||
v1 = APIRouter(prefix='/v1')
|
v1 = APIRouter(prefix='/v1')
|
||||||
|
|
||||||
@ -27,3 +29,5 @@ v1.include_router(config_router, prefix='/configs', tags=['系统配置'])
|
|||||||
v1.include_router(login_log_router, prefix='/login_logs', tags=['登录日志管理'])
|
v1.include_router(login_log_router, prefix='/login_logs', tags=['登录日志管理'])
|
||||||
v1.include_router(opera_log_router, prefix='/opera_logs', tags=['操作日志管理'])
|
v1.include_router(opera_log_router, prefix='/opera_logs', tags=['操作日志管理'])
|
||||||
v1.include_router(task_demo_router, prefix='/tasks', tags=['任务管理'])
|
v1.include_router(task_demo_router, prefix='/tasks', tags=['任务管理'])
|
||||||
|
v1.include_router(dict_type_router, prefix='/dict_types', tags=['字典类型管理'])
|
||||||
|
v1.include_router(dict_data_router, prefix='/dict_datas', tags=['字典数据管理'])
|
||||||
|
56
backend/app/api/v1/dict_data.py
Normal file
56
backend/app/api/v1/dict_data.py
Normal file
@ -0,0 +1,56 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
from typing import Annotated
|
||||||
|
|
||||||
|
from fastapi import APIRouter, Query, Request
|
||||||
|
|
||||||
|
from backend.app.common.casbin_rbac import DependsRBAC
|
||||||
|
from backend.app.common.pagination import PageDepends, paging_data
|
||||||
|
from backend.app.common.response.response_schema import response_base
|
||||||
|
from backend.app.database.db_mysql import CurrentSession
|
||||||
|
from backend.app.schemas.dict_data import GetAllDictData, CreateDictData, UpdateDictData
|
||||||
|
from backend.app.services.dict_data_service import DictDataService
|
||||||
|
from backend.app.utils.serializers import select_to_json
|
||||||
|
|
||||||
|
router = APIRouter()
|
||||||
|
|
||||||
|
|
||||||
|
@router.get('/{pk}', summary='获取字典详情', dependencies=[DependsRBAC])
|
||||||
|
async def get_dict_data(pk: int):
|
||||||
|
dict_data = await DictDataService.get(pk=pk)
|
||||||
|
data = GetAllDictData(**select_to_json(dict_data))
|
||||||
|
return await response_base.success(data=data)
|
||||||
|
|
||||||
|
|
||||||
|
@router.get('', summary='(模糊条件)分页获取所有字典', dependencies=[DependsRBAC, PageDepends])
|
||||||
|
async def get_all_dict_datas(
|
||||||
|
db: CurrentSession,
|
||||||
|
label: Annotated[str | None, Query()] = None,
|
||||||
|
value: Annotated[str | None, Query()] = None,
|
||||||
|
status: Annotated[str | None, Query()] = None,
|
||||||
|
):
|
||||||
|
dict_data_select = await DictDataService.get_select(label=label, value=value, status=status)
|
||||||
|
page_data = await paging_data(db, dict_data_select, GetAllDictData)
|
||||||
|
return await response_base.success(data=page_data)
|
||||||
|
|
||||||
|
|
||||||
|
@router.post('', summary='创建字典', dependencies=[DependsRBAC])
|
||||||
|
async def create_dict_data(request: Request, obj: CreateDictData):
|
||||||
|
await DictDataService.create(obj=obj, user_id=request.user.id)
|
||||||
|
return await response_base.success()
|
||||||
|
|
||||||
|
|
||||||
|
@router.put('/{pk}', summary='更新字典', dependencies=[DependsRBAC])
|
||||||
|
async def update_dict_data(request: Request, pk: int, obj: UpdateDictData):
|
||||||
|
count = await DictDataService.update(pk=pk, obj=obj, user_id=request.user.id)
|
||||||
|
if count > 0:
|
||||||
|
return await response_base.success()
|
||||||
|
return await response_base.fail()
|
||||||
|
|
||||||
|
|
||||||
|
@router.delete('', summary='(批量)删除字典', dependencies=[DependsRBAC])
|
||||||
|
async def delete_dict_data(pk: Annotated[list[int], Query(...)]):
|
||||||
|
count = await DictDataService.delete(pk=pk)
|
||||||
|
if count > 0:
|
||||||
|
return await response_base.success()
|
||||||
|
return await response_base.fail()
|
48
backend/app/api/v1/dict_type.py
Normal file
48
backend/app/api/v1/dict_type.py
Normal file
@ -0,0 +1,48 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
from typing import Annotated
|
||||||
|
|
||||||
|
from fastapi import APIRouter, Query, Request
|
||||||
|
|
||||||
|
from backend.app.common.casbin_rbac import DependsRBAC
|
||||||
|
from backend.app.common.pagination import PageDepends, paging_data
|
||||||
|
from backend.app.common.response.response_schema import response_base
|
||||||
|
from backend.app.database.db_mysql import CurrentSession
|
||||||
|
from backend.app.schemas.dict_type import GetAllDictType, CreateDictType, UpdateDictType
|
||||||
|
from backend.app.services.dict_type_service import DictTypeService
|
||||||
|
|
||||||
|
router = APIRouter()
|
||||||
|
|
||||||
|
|
||||||
|
@router.get('', summary='(模糊条件)分页获取所有字典类型', dependencies=[DependsRBAC, PageDepends])
|
||||||
|
async def get_all_dict_types(
|
||||||
|
db: CurrentSession,
|
||||||
|
name: Annotated[str | None, Query()] = None,
|
||||||
|
code: Annotated[str | None, Query()] = None,
|
||||||
|
status: Annotated[str | None, Query()] = None,
|
||||||
|
):
|
||||||
|
dict_type_select = await DictTypeService.get_select(name=name, code=code, status=status)
|
||||||
|
page_data = await paging_data(db, dict_type_select, GetAllDictType)
|
||||||
|
return await response_base.success(data=page_data)
|
||||||
|
|
||||||
|
|
||||||
|
@router.post('', summary='创建字典类型', dependencies=[DependsRBAC])
|
||||||
|
async def create_dict_type(request: Request, obj: CreateDictType):
|
||||||
|
await DictTypeService.create(obj=obj, user_id=request.user.id)
|
||||||
|
return await response_base.success()
|
||||||
|
|
||||||
|
|
||||||
|
@router.put('/{pk}', summary='更新字典类型', dependencies=[DependsRBAC])
|
||||||
|
async def update_dict_type(request: Request, pk: int, obj: UpdateDictType):
|
||||||
|
count = await DictTypeService.update(pk=pk, obj=obj, user_id=request.user.id)
|
||||||
|
if count > 0:
|
||||||
|
return await response_base.success()
|
||||||
|
return await response_base.fail()
|
||||||
|
|
||||||
|
|
||||||
|
@router.delete('', summary='(批量)删除字典类型', dependencies=[DependsRBAC])
|
||||||
|
async def delete_dict_type(pk: Annotated[list[int], Query(...)]):
|
||||||
|
count = await DictTypeService.delete(pk=pk)
|
||||||
|
if count > 0:
|
||||||
|
return await response_base.success()
|
||||||
|
return await response_base.fail()
|
49
backend/app/crud/crud_dict_data.py
Normal file
49
backend/app/crud/crud_dict_data.py
Normal file
@ -0,0 +1,49 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
from sqlalchemy import select, Select, desc, and_, delete
|
||||||
|
from sqlalchemy.ext.asyncio import AsyncSession
|
||||||
|
from sqlalchemy.orm import selectinload
|
||||||
|
|
||||||
|
from backend.app.crud.base import CRUDBase
|
||||||
|
from backend.app.models.sys_dict_data import DictData
|
||||||
|
from backend.app.schemas.dict_data import CreateDictData, UpdateDictData
|
||||||
|
|
||||||
|
|
||||||
|
class CRUDDictData(CRUDBase[DictData, CreateDictData, UpdateDictData]):
|
||||||
|
async def get(self, db: AsyncSession, pk: int) -> DictData | None:
|
||||||
|
return await self.get_(db, pk=pk)
|
||||||
|
|
||||||
|
async def get_all(self, label: str = None, value: str = None, status: int = None) -> Select:
|
||||||
|
se = select(self.model).options(selectinload(self.model.type)).order_by(desc(self.model.sort))
|
||||||
|
where_list = []
|
||||||
|
if label:
|
||||||
|
where_list.append(self.model.label.like(f'%{label}%'))
|
||||||
|
if value:
|
||||||
|
where_list.append(self.model.value.like(f'%{value}%'))
|
||||||
|
if status is not None:
|
||||||
|
where_list.append(self.model.status == status)
|
||||||
|
if where_list:
|
||||||
|
se = se.where(and_(*where_list))
|
||||||
|
return se
|
||||||
|
|
||||||
|
async def get_by_label(self, db: AsyncSession, label: str) -> DictData | None:
|
||||||
|
api = await db.execute(select(self.model).where(self.model.label == label))
|
||||||
|
return api.scalars().first()
|
||||||
|
|
||||||
|
async def create(self, db: AsyncSession, obj_in: CreateDictData, user_id: int) -> None:
|
||||||
|
await self.create_(db, obj_in, user_id)
|
||||||
|
|
||||||
|
async def update(self, db: AsyncSession, pk: int, obj_in: UpdateDictData, user_id: int) -> int:
|
||||||
|
return await self.update_(db, pk, obj_in, user_id)
|
||||||
|
|
||||||
|
async def delete(self, db: AsyncSession, pk: list[int]) -> int:
|
||||||
|
apis = await db.execute(delete(self.model).where(self.model.id.in_(pk)))
|
||||||
|
return apis.rowcount
|
||||||
|
|
||||||
|
async def get_with_relation(self, db: AsyncSession, pk: int) -> DictData | None:
|
||||||
|
where = [self.model.id == pk]
|
||||||
|
dict_data = await db.execute(select(self.model).options(selectinload(self.model.type)).where(*where))
|
||||||
|
return dict_data.scalars().first()
|
||||||
|
|
||||||
|
|
||||||
|
DictDataDao: CRUDDictData = CRUDDictData(DictData)
|
43
backend/app/crud/crud_dict_type.py
Normal file
43
backend/app/crud/crud_dict_type.py
Normal file
@ -0,0 +1,43 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
from sqlalchemy import Select, select, desc, delete
|
||||||
|
from sqlalchemy.ext.asyncio import AsyncSession
|
||||||
|
|
||||||
|
from backend.app.crud.base import CRUDBase
|
||||||
|
from backend.app.models.sys_dict_type import DictType
|
||||||
|
from backend.app.schemas.dict_type import CreateDictType, UpdateDictType
|
||||||
|
|
||||||
|
|
||||||
|
class CRUDDictType(CRUDBase[DictType, CreateDictType, UpdateDictType]):
|
||||||
|
async def get(self, db: AsyncSession, pk: int) -> DictType | None:
|
||||||
|
return await self.get_(db, pk=pk)
|
||||||
|
|
||||||
|
async def get_all(self, *, name: str = None, code: str = None, status: int = None) -> Select:
|
||||||
|
se = select(self.model).order_by(desc(self.model.created_time))
|
||||||
|
where_list = []
|
||||||
|
if name:
|
||||||
|
where_list.append(self.model.name.like(f'%{name}%'))
|
||||||
|
if code:
|
||||||
|
where_list.append(self.model.code.like(f'%{code}%'))
|
||||||
|
if status is not None:
|
||||||
|
where_list.append(self.model.status == status)
|
||||||
|
if where_list:
|
||||||
|
se = se.where(*where_list)
|
||||||
|
return se
|
||||||
|
|
||||||
|
async def get_by_code(self, db: AsyncSession, code: str) -> DictType | None:
|
||||||
|
dept = await db.execute(select(self.model).where(self.model.code == code))
|
||||||
|
return dept.scalars().first()
|
||||||
|
|
||||||
|
async def create(self, db: AsyncSession, obj_in: CreateDictType, user_id: int) -> None:
|
||||||
|
await self.create_(db, obj_in, user_id)
|
||||||
|
|
||||||
|
async def update(self, db: AsyncSession, pk: int, obj_in: UpdateDictType, user_id: int) -> int:
|
||||||
|
return await self.update_(db, pk, obj_in, user_id)
|
||||||
|
|
||||||
|
async def delete(self, db: AsyncSession, pk: list[int]) -> int:
|
||||||
|
apis = await db.execute(delete(self.model).where(self.model.id.in_(pk)))
|
||||||
|
return apis.rowcount
|
||||||
|
|
||||||
|
|
||||||
|
DictTypeDao: CRUDDictType = CRUDDictType(DictType)
|
@ -13,3 +13,5 @@ from backend.app.models.sys_role import Role
|
|||||||
from backend.app.models.sys_user import User
|
from backend.app.models.sys_user import User
|
||||||
from backend.app.models.sys_login_log import LoginLog
|
from backend.app.models.sys_login_log import LoginLog
|
||||||
from backend.app.models.sys_opera_log import OperaLog
|
from backend.app.models.sys_opera_log import OperaLog
|
||||||
|
from backend.app.models.sys_dict_type import DictType
|
||||||
|
from backend.app.models.sys_dict_data import DictData
|
||||||
|
23
backend/app/models/sys_dict_data.py
Normal file
23
backend/app/models/sys_dict_data.py
Normal file
@ -0,0 +1,23 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
from sqlalchemy import String, ForeignKey
|
||||||
|
from sqlalchemy.dialects.mysql import LONGTEXT
|
||||||
|
from sqlalchemy.orm import Mapped, mapped_column, relationship
|
||||||
|
|
||||||
|
from backend.app.database.base_class import Base, id_key
|
||||||
|
|
||||||
|
|
||||||
|
class DictData(Base):
|
||||||
|
"""字典数据"""
|
||||||
|
|
||||||
|
__tablename__ = 'sys_dict_data'
|
||||||
|
|
||||||
|
id: Mapped[id_key] = mapped_column(init=False)
|
||||||
|
label: Mapped[str] = mapped_column(String(32), unique=True, comment='字典标签')
|
||||||
|
value: Mapped[str] = mapped_column(String(32), unique=True, comment='字典值')
|
||||||
|
type_id: Mapped[int] = mapped_column(ForeignKey('sys_dict_type.id'), comment='字典类型id')
|
||||||
|
sort: Mapped[int] = mapped_column(default=0, comment='排序')
|
||||||
|
status: Mapped[bool] = mapped_column(default=True, comment='状态(0停用 1正常)')
|
||||||
|
remark: Mapped[str | None] = mapped_column(LONGTEXT, default=None, comment='备注')
|
||||||
|
# 字典类型一对多
|
||||||
|
type: Mapped['DictType'] = relationship(init=False, back_populates='datas') # noqa: F821
|
21
backend/app/models/sys_dict_type.py
Normal file
21
backend/app/models/sys_dict_type.py
Normal file
@ -0,0 +1,21 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
from sqlalchemy import String
|
||||||
|
from sqlalchemy.dialects.mysql import LONGTEXT
|
||||||
|
from sqlalchemy.orm import Mapped, mapped_column, relationship
|
||||||
|
|
||||||
|
from backend.app.database.base_class import Base, id_key
|
||||||
|
|
||||||
|
|
||||||
|
class DictType(Base):
|
||||||
|
"""字典类型"""
|
||||||
|
|
||||||
|
__tablename__ = 'sys_dict_type'
|
||||||
|
|
||||||
|
id: Mapped[id_key] = mapped_column(init=False)
|
||||||
|
name: Mapped[str] = mapped_column(String(32), unique=True, comment='字典类型名称')
|
||||||
|
code: Mapped[str] = mapped_column(String(32), unique=True, comment='字典类型编码')
|
||||||
|
status: Mapped[bool] = mapped_column(default=True, comment='状态(0停用 1正常)')
|
||||||
|
remark: Mapped[str | None] = mapped_column(LONGTEXT, default=None, comment='备注')
|
||||||
|
# 字典类型一对多
|
||||||
|
datas: Mapped[list['DictData']] = relationship(init=False, back_populates='type') # noqa: F821
|
36
backend/app/schemas/dict_data.py
Normal file
36
backend/app/schemas/dict_data.py
Normal file
@ -0,0 +1,36 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
from pydantic import BaseModel
|
||||||
|
|
||||||
|
from backend.app.schemas.dict_type import GetAllDictType
|
||||||
|
|
||||||
|
|
||||||
|
class DictDataBase(BaseModel):
|
||||||
|
label: str
|
||||||
|
value: str
|
||||||
|
sort: int
|
||||||
|
status: bool
|
||||||
|
remark: str | None = None
|
||||||
|
type_id: int
|
||||||
|
|
||||||
|
|
||||||
|
class CreateDictData(DictDataBase):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class UpdateDictData(DictDataBase):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class GetAllDictData(DictDataBase):
|
||||||
|
id: int
|
||||||
|
type: GetAllDictType
|
||||||
|
create_user: int
|
||||||
|
update_user: int = None
|
||||||
|
created_time: datetime
|
||||||
|
updated_time: datetime | None = None
|
||||||
|
|
||||||
|
class Config:
|
||||||
|
orm_mode = True
|
31
backend/app/schemas/dict_type.py
Normal file
31
backend/app/schemas/dict_type.py
Normal file
@ -0,0 +1,31 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
from pydantic import BaseModel
|
||||||
|
|
||||||
|
|
||||||
|
class DictTypeBase(BaseModel):
|
||||||
|
name: str
|
||||||
|
code: str
|
||||||
|
status: bool
|
||||||
|
remark: str | None = None
|
||||||
|
|
||||||
|
|
||||||
|
class CreateDictType(DictTypeBase):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class UpdateDictType(DictTypeBase):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class GetAllDictType(DictTypeBase):
|
||||||
|
id: int
|
||||||
|
create_user: int
|
||||||
|
update_user: int = None
|
||||||
|
created_time: datetime
|
||||||
|
updated_time: datetime | None = None
|
||||||
|
|
||||||
|
class Config:
|
||||||
|
orm_mode = True
|
56
backend/app/services/dict_data_service.py
Normal file
56
backend/app/services/dict_data_service.py
Normal file
@ -0,0 +1,56 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
from sqlalchemy import Select
|
||||||
|
|
||||||
|
from backend.app.common.exception import errors
|
||||||
|
from backend.app.crud.crud_dict_data import DictDataDao
|
||||||
|
from backend.app.crud.crud_dict_type import DictTypeDao
|
||||||
|
from backend.app.database.db_mysql import async_db_session
|
||||||
|
from backend.app.models.sys_dict_data import DictData
|
||||||
|
from backend.app.schemas.dict_data import CreateDictData, UpdateDictData
|
||||||
|
|
||||||
|
|
||||||
|
class DictDataService:
|
||||||
|
@staticmethod
|
||||||
|
async def get(*, pk: int) -> DictData:
|
||||||
|
async with async_db_session() as db:
|
||||||
|
dict_data = await DictDataDao.get_with_relation(db, pk)
|
||||||
|
if not dict_data:
|
||||||
|
raise errors.NotFoundError(msg='字典数据不存在')
|
||||||
|
return dict_data
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
async def get_select(*, label: str = None, value: str = None, status: int = None) -> Select:
|
||||||
|
return await DictDataDao.get_all(label=label, value=value, status=status)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
async def create(*, obj: CreateDictData, user_id: int) -> None:
|
||||||
|
async with async_db_session.begin() as db:
|
||||||
|
dict_data = await DictDataDao.get_by_label(db, obj.label)
|
||||||
|
if dict_data:
|
||||||
|
raise errors.ForbiddenError(msg='字典数据已存在')
|
||||||
|
dict_type = await DictTypeDao.get(db, obj.type_id)
|
||||||
|
if not dict_type:
|
||||||
|
raise errors.ForbiddenError(msg='字典类型不存在')
|
||||||
|
await DictDataDao.create(db, obj, user_id)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
async def update(*, pk: int, obj: UpdateDictData, user_id: int) -> int:
|
||||||
|
async with async_db_session.begin() as db:
|
||||||
|
dict_data = await DictDataDao.get(db, pk)
|
||||||
|
if not dict_data:
|
||||||
|
raise errors.NotFoundError(msg='字典数据不存在')
|
||||||
|
if dict_data.label != obj.label:
|
||||||
|
if await DictDataDao.get_by_label(db, obj.label):
|
||||||
|
raise errors.ForbiddenError(msg='字典数据已存在')
|
||||||
|
dict_type = await DictTypeDao.get(db, obj.type_id)
|
||||||
|
if not dict_type:
|
||||||
|
raise errors.ForbiddenError(msg='字典类型不存在')
|
||||||
|
count = await DictDataDao.update(db, pk, obj, user_id)
|
||||||
|
return count
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
async def delete(*, pk: list[int]) -> int:
|
||||||
|
async with async_db_session.begin() as db:
|
||||||
|
count = await DictDataDao.delete(db, pk)
|
||||||
|
return count
|
40
backend/app/services/dict_type_service.py
Normal file
40
backend/app/services/dict_type_service.py
Normal file
@ -0,0 +1,40 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
from sqlalchemy import Select
|
||||||
|
|
||||||
|
from backend.app.common.exception import errors
|
||||||
|
from backend.app.crud.crud_dict_type import DictTypeDao
|
||||||
|
from backend.app.database.db_mysql import async_db_session
|
||||||
|
from backend.app.schemas.dict_type import CreateDictType, UpdateDictType
|
||||||
|
|
||||||
|
|
||||||
|
class DictTypeService:
|
||||||
|
@staticmethod
|
||||||
|
async def get_select(*, name: str = None, code: str = None, status: int = None) -> Select:
|
||||||
|
return await DictTypeDao.get_all(name=name, code=code, status=status)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
async def create(*, obj: CreateDictType, user_id: int) -> None:
|
||||||
|
async with async_db_session.begin() as db:
|
||||||
|
dict_type = await DictTypeDao.get_by_code(db, obj.code)
|
||||||
|
if dict_type:
|
||||||
|
raise errors.ForbiddenError(msg='字典类型已存在')
|
||||||
|
await DictTypeDao.create(db, obj, user_id)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
async def update(*, pk: int, obj: UpdateDictType, user_id: int) -> int:
|
||||||
|
async with async_db_session.begin() as db:
|
||||||
|
dict_type = await DictTypeDao.get(db, pk)
|
||||||
|
if not dict_type:
|
||||||
|
raise errors.NotFoundError(msg='字典类型不存在')
|
||||||
|
if dict_type.code != obj.code:
|
||||||
|
if await DictTypeDao.get_by_code(db, obj.code):
|
||||||
|
raise errors.ForbiddenError(msg='字典类型已存在')
|
||||||
|
count = await DictTypeDao.update(db, pk, obj, user_id)
|
||||||
|
return count
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
async def delete(*, pk: list[int]) -> int:
|
||||||
|
async with async_db_session.begin() as db:
|
||||||
|
count = await DictTypeDao.delete(db, pk)
|
||||||
|
return count
|
Reference in New Issue
Block a user