Files
IAseven e09062eb39 Optimize the opera log storage logic through queue (#750)
*  feat: 操作日志中间件添加批量插入功能

* Delete GEMINI.md

* 🌈 style: 修复格式化错误

* 🐞 fix: 通过asyncio.wait_for兼容py3.10中asyncio.timeout不存在

* 🦄 refactor: 重新组织操作日志批量插入代码逻辑

* 优化代码实现

* 恢复默认配置

* 恢复默认 .gitignore 文件

* 更新队列批处理逻辑
2025-08-07 17:36:10 +08:00

194 lines
5.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
from asyncio import create_task
from contextlib import asynccontextmanager
from typing import AsyncGenerator
import socketio
from asgi_correlation_id import CorrelationIdMiddleware
from fastapi import Depends, FastAPI
from fastapi_limiter import FastAPILimiter
from fastapi_pagination import add_pagination
from starlette.middleware.authentication import AuthenticationMiddleware
from starlette.staticfiles import StaticFiles
from backend.common.exception.exception_handler import register_exception
from backend.common.log import set_custom_logfile, setup_logging
from backend.core.conf import settings
from backend.core.path_conf import STATIC_DIR, UPLOAD_DIR
from backend.database.db import create_tables
from backend.database.redis import redis_client
from backend.middleware.access_middleware import AccessMiddleware
from backend.middleware.jwt_auth_middleware import JwtAuthMiddleware
from backend.middleware.opera_log_middleware import OperaLogMiddleware
from backend.middleware.state_middleware import StateMiddleware
from backend.plugin.tools import build_final_router
from backend.utils.demo_site import demo_site
from backend.utils.health_check import ensure_unique_route_names, http_limit_callback
from backend.utils.openapi import simplify_operation_ids
from backend.utils.serializers import MsgSpecJSONResponse
@asynccontextmanager
async def register_init(app: FastAPI) -> AsyncGenerator[None, None]:
"""
启动初始化
:param app: FastAPI 应用实例
:return:
"""
# 创建数据库表
await create_tables()
# 初始化 limiter
await FastAPILimiter.init(
redis=redis_client,
prefix=settings.REQUEST_LIMITER_REDIS_PREFIX,
http_callback=http_limit_callback,
)
# 创建操作日志任务
create_task(OperaLogMiddleware.consumer())
yield
# 关闭 redis 连接
await redis_client.close()
# 关闭 limiter
await FastAPILimiter.close()
def register_app() -> FastAPI:
"""注册 FastAPI 应用"""
app = FastAPI(
title=settings.FASTAPI_TITLE,
version=settings.FASTAPI_VERSION,
description=settings.FASTAPI_DESCRIPTION,
docs_url=settings.FASTAPI_DOCS_URL,
redoc_url=settings.FASTAPI_REDOC_URL,
openapi_url=settings.FASTAPI_OPENAPI_URL,
default_response_class=MsgSpecJSONResponse,
lifespan=register_init,
)
# 注册组件
register_logger()
register_socket_app(app)
register_static_file(app)
register_middleware(app)
register_router(app)
register_page(app)
register_exception(app)
return app
def register_logger() -> None:
"""注册日志"""
setup_logging()
set_custom_logfile()
def register_static_file(app: FastAPI) -> None:
"""
注册静态资源服务
:param app: FastAPI 应用实例
:return:
"""
# 上传静态资源
if not os.path.exists(UPLOAD_DIR):
os.makedirs(UPLOAD_DIR)
app.mount('/static/upload', StaticFiles(directory=UPLOAD_DIR), name='upload')
# 固有静态资源
if settings.FASTAPI_STATIC_FILES:
app.mount('/static', StaticFiles(directory=STATIC_DIR), name='static')
def register_middleware(app: FastAPI) -> None:
"""
注册中间件(执行顺序从下往上)
:param app: FastAPI 应用实例
:return:
"""
# Opera log
app.add_middleware(OperaLogMiddleware)
# State
app.add_middleware(StateMiddleware)
# JWT auth
app.add_middleware(
AuthenticationMiddleware,
backend=JwtAuthMiddleware(),
on_error=JwtAuthMiddleware.auth_exception_handler,
)
# CORS
if settings.MIDDLEWARE_CORS:
from fastapi.middleware.cors import CORSMiddleware
app.add_middleware(
CORSMiddleware,
allow_origins=settings.CORS_ALLOWED_ORIGINS,
allow_credentials=True,
allow_methods=['*'],
allow_headers=['*'],
expose_headers=settings.CORS_EXPOSE_HEADERS,
)
# Access log
app.add_middleware(AccessMiddleware)
# Trace ID
app.add_middleware(CorrelationIdMiddleware, validator=False)
def register_router(app: FastAPI) -> None:
"""
注册路由
:param app: FastAPI 应用实例
:return:
"""
dependencies = [Depends(demo_site)] if settings.DEMO_MODE else None
# API
router = build_final_router()
app.include_router(router, dependencies=dependencies)
# Extra
ensure_unique_route_names(app)
simplify_operation_ids(app)
def register_page(app: FastAPI) -> None:
"""
注册分页查询功能
:param app: FastAPI 应用实例
:return:
"""
add_pagination(app)
def register_socket_app(app: FastAPI) -> None:
"""
注册 Socket.IO 应用
:param app: FastAPI 应用实例
:return:
"""
from backend.common.socketio.server import sio
socket_app = socketio.ASGIApp(
socketio_server=sio,
other_asgi_app=app,
# 切勿删除此配置https://github.com/pyropy/fastapi-socketio/issues/51
socketio_path='/ws/socket.io',
)
app.mount('/ws', socket_app)