mirror of
https://github.com/fastapi-practices/fastapi_best_architecture.git
synced 2025-08-26 04:33:09 +08:00
Add postgresql database support (#475)
* feat: add postgresql db supports * change: change mysql conn str create way * fix: Modify the default alembic migration file to meet multi-database support * Update settings and lint * update models * Simplify database config * Simplify the get db method * Update create db url * Updated model type adaptation * Update sql scripts * Fix models type * Adaptation to postgresql code generation * Update README.md * Fix alembic file template * Update docker scripts
This commit is contained in:
@ -1,5 +1,3 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
import sys
|
||||
|
||||
from typing import Annotated
|
||||
@ -14,10 +12,31 @@ from backend.common.model import MappedBase
|
||||
from backend.core.conf import settings
|
||||
|
||||
|
||||
def create_database_url(unittest: bool = False) -> URL:
|
||||
"""
|
||||
创建数据库链接
|
||||
|
||||
:param unittest: 是否用于单元测试
|
||||
:return:
|
||||
"""
|
||||
url = URL.create(
|
||||
drivername='mysql+asyncmy' if settings.DATABASE_TYPE == 'mysql' else 'postgresql+asyncpg',
|
||||
username=settings.DATABASE_USER,
|
||||
password=settings.DATABASE_PASSWORD,
|
||||
host=settings.DATABASE_HOST,
|
||||
port=settings.DATABASE_PORT,
|
||||
database=settings.DATABASE_SCHEMA if not unittest else f'{settings.DATABASE_SCHEMA}_test',
|
||||
)
|
||||
if settings.DATABASE_TYPE == 'mysql':
|
||||
url.update_query_dict({'charset': settings.DATABASE_CHARSET})
|
||||
return url
|
||||
|
||||
|
||||
def create_engine_and_session(url: str | URL):
|
||||
"""创建数据库引擎和 Session"""
|
||||
try:
|
||||
# 数据库引擎
|
||||
engine = create_async_engine(url, echo=settings.MYSQL_ECHO, future=True, pool_pre_ping=True)
|
||||
engine = create_async_engine(url, echo=settings.DATABASE_ECHO, future=True, pool_pre_ping=True)
|
||||
# log.success('数据库连接成功')
|
||||
except Exception as e:
|
||||
log.error('❌ 数据库链接失败 {}', e)
|
||||
@ -27,28 +46,10 @@ def create_engine_and_session(url: str | URL):
|
||||
return engine, db_session
|
||||
|
||||
|
||||
SQLALCHEMY_DATABASE_URL = (
|
||||
f'mysql+asyncmy://{settings.MYSQL_USER}:{settings.MYSQL_PASSWORD}@{settings.MYSQL_HOST}:'
|
||||
f'{settings.MYSQL_PORT}/{settings.MYSQL_DATABASE}?charset={settings.MYSQL_CHARSET}'
|
||||
)
|
||||
|
||||
async_engine, async_db_session = create_engine_and_session(SQLALCHEMY_DATABASE_URL)
|
||||
|
||||
|
||||
async def get_db() -> AsyncSession:
|
||||
async def get_db():
|
||||
"""session 生成器"""
|
||||
session = async_db_session()
|
||||
try:
|
||||
async with async_db_session() as session:
|
||||
yield session
|
||||
except Exception as se:
|
||||
await session.rollback()
|
||||
raise se
|
||||
finally:
|
||||
await session.close()
|
||||
|
||||
|
||||
# Session Annotated
|
||||
CurrentSession = Annotated[AsyncSession, Depends(get_db)]
|
||||
|
||||
|
||||
async def create_table():
|
||||
@ -60,3 +61,9 @@ async def create_table():
|
||||
def uuid4_str() -> str:
|
||||
"""数据库引擎 UUID 类型兼容性解决方案"""
|
||||
return str(uuid4())
|
||||
|
||||
|
||||
SQLALCHEMY_DATABASE_URL = create_database_url()
|
||||
async_engine, async_db_session = create_engine_and_session(SQLALCHEMY_DATABASE_URL)
|
||||
# Session Annotated
|
||||
CurrentSession = Annotated[AsyncSession, Depends(get_db)]
|
Reference in New Issue
Block a user