mirror of
https://github.com/fastapi-practices/fastapi_best_architecture.git
synced 2025-08-16 12:52:38 +08:00

* define the basic architecture * Update script and deployment file locations * Update the route registration * Fix CI download dependencies * Updated ruff to 0.3.3 * Update app subdirectory naming * Update the model import * fix pre-commit pdm lock * Update the service directory naming * Add CRUD method documents * Fix the issue of circular import * Update the README document * Update the SQL statement for create tables * Update docker scripts and documentation * Fix docker scripts * Update the backend README.md * Add the security folder and move the redis client * Update the configuration item * Fix environment configuration reads * Update the default configuration * Updated README description * Updated the user registration API * Fix test cases * Update the celery configuration * Update and fix celery configuration * Updated the celery structure * Update celery tasks and api * Add celery flower * Update the import style * Update contributors
37 lines
965 B
Python
37 lines
965 B
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
from math import ceil
|
|
|
|
from fastapi import FastAPI, Request, Response
|
|
from fastapi.routing import APIRoute
|
|
|
|
from backend.common.exception import errors
|
|
|
|
|
|
def ensure_unique_route_names(app: FastAPI) -> None:
|
|
"""
|
|
检查路由名称是否唯一
|
|
|
|
:param app:
|
|
:return:
|
|
"""
|
|
temp_routes = set()
|
|
for route in app.routes:
|
|
if isinstance(route, APIRoute):
|
|
if route.name in temp_routes:
|
|
raise ValueError(f'Non-unique route name: {route.name}')
|
|
temp_routes.add(route.name)
|
|
|
|
|
|
async def http_limit_callback(request: Request, response: Response, expire: int):
|
|
"""
|
|
请求限制时的默认回调函数
|
|
|
|
:param request:
|
|
:param response:
|
|
:param expire: 剩余毫秒
|
|
:return:
|
|
"""
|
|
expires = ceil(expire / 1000)
|
|
raise errors.HTTPError(code=429, msg='请求过于频繁,请稍后重试', headers={'Retry-After': str(expires)})
|