#!/usr/bin/env python3 # -*- coding: utf-8 -*- import socketio from backend.common.log import log from backend.common.security.jwt import jwt_authentication from backend.core.conf import settings from backend.database.redis import redis_client # 创建 Socket.IO 服务器实例 sio = socketio.AsyncServer( client_manager=socketio.AsyncRedisManager( f'redis://:{settings.REDIS_PASSWORD}@{settings.REDIS_HOST}:{settings.REDIS_PORT}/{settings.REDIS_DATABASE}' ), async_mode='asgi', cors_allowed_origins=settings.CORS_ALLOWED_ORIGINS, cors_credentials=True, namespaces=['/ws'], ) @sio.event async def connect(sid, environ, auth): """Socket 连接事件""" if not auth: log.error('WebSocket 连接失败:无授权') return False session_uuid = auth.get('session_uuid') token = auth.get('token') if not token or not session_uuid: log.error('WebSocket 连接失败:授权失败,请检查') return False # 免授权直连 if token == settings.WS_NO_AUTH_MARKER: await redis_client.sadd(settings.TOKEN_ONLINE_REDIS_PREFIX, session_uuid) return True try: await jwt_authentication(token) except Exception as e: log.info(f'WebSocket 连接失败:{str(e)}') return False await redis_client.sadd(settings.TOKEN_ONLINE_REDIS_PREFIX, session_uuid) return True @sio.event async def disconnect(sid) -> None: """Socket 断开连接事件""" await redis_client.spop(settings.TOKEN_ONLINE_REDIS_PREFIX)