mirror of
				https://github.com/fastapi-users/fastapi-users.git
				synced 2025-11-04 14:45:50 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			67 lines
		
	
	
		
			2.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			67 lines
		
	
	
		
			2.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from typing import List, Optional
 | 
						|
 | 
						|
from databases import Database
 | 
						|
from sqlalchemy import Boolean, Column, String, Table
 | 
						|
 | 
						|
from fastapi_users.db.base import BaseUserDatabase
 | 
						|
from fastapi_users.models import BaseUserDB
 | 
						|
 | 
						|
 | 
						|
class SQLAlchemyBaseUserTable:
 | 
						|
    """Base SQLAlchemy users table definition."""
 | 
						|
 | 
						|
    __tablename__ = "user"
 | 
						|
 | 
						|
    id = Column(String, primary_key=True)
 | 
						|
    email = Column(String, unique=True, index=True, nullable=False)
 | 
						|
    hashed_password = Column(String, nullable=False)
 | 
						|
    is_active = Column(Boolean, default=True, nullable=False)
 | 
						|
    is_superuser = Column(Boolean, default=False, nullable=False)
 | 
						|
 | 
						|
 | 
						|
class SQLAlchemyUserDatabase(BaseUserDatabase):
 | 
						|
    """
 | 
						|
    Database adapter for SQLAlchemy.
 | 
						|
 | 
						|
    :param database: `Database` instance from `encode/databases`.
 | 
						|
    :param users: SQLAlchemy users table instance.
 | 
						|
    """
 | 
						|
 | 
						|
    database: Database
 | 
						|
    users: Table
 | 
						|
 | 
						|
    def __init__(self, database: Database, users: Table):
 | 
						|
        self.database = database
 | 
						|
        self.users = users
 | 
						|
 | 
						|
    async def list(self) -> List[BaseUserDB]:
 | 
						|
        query = self.users.select()
 | 
						|
        users = await self.database.fetch_all(query)
 | 
						|
        return [BaseUserDB(**user) for user in users]
 | 
						|
 | 
						|
    async def get(self, id: str) -> Optional[BaseUserDB]:
 | 
						|
        query = self.users.select().where(self.users.c.id == id)
 | 
						|
        user = await self.database.fetch_one(query)
 | 
						|
        return BaseUserDB(**user) if user else None
 | 
						|
 | 
						|
    async def get_by_email(self, email: str) -> Optional[BaseUserDB]:
 | 
						|
        query = self.users.select().where(self.users.c.email == email)
 | 
						|
        user = await self.database.fetch_one(query)
 | 
						|
        return BaseUserDB(**user) if user else None
 | 
						|
 | 
						|
    async def create(self, user: BaseUserDB) -> BaseUserDB:
 | 
						|
        query = self.users.insert().values(**user.dict())
 | 
						|
        await self.database.execute(query)
 | 
						|
        return user
 | 
						|
 | 
						|
    async def update(self, user: BaseUserDB) -> BaseUserDB:
 | 
						|
        query = (
 | 
						|
            self.users.update().where(self.users.c.id == user.id).values(**user.dict())
 | 
						|
        )
 | 
						|
        await self.database.execute(query)
 | 
						|
        return user
 | 
						|
 | 
						|
    async def delete(self, user: BaseUserDB) -> None:
 | 
						|
        query = self.users.delete().where(self.users.c.id == user.id)
 | 
						|
        await self.database.execute(query)
 |