Files
Wu Clan 5e438c685d Refactor the backend architecture (#299)
* define the basic architecture

* Update script and deployment file locations

* Update the route registration

* Fix CI download dependencies

* Updated ruff to 0.3.3

* Update app subdirectory naming

* Update the model import

* fix pre-commit pdm lock

* Update the service directory naming

* Add CRUD method documents

* Fix the issue of circular import

* Update the README document

* Update the SQL statement for create tables

* Update docker scripts and documentation

* Fix docker scripts

* Update the backend README.md

* Add the security folder and move the redis client

* Update the configuration item

* Fix environment configuration reads

* Update the default configuration

* Updated README description

* Updated the user registration API

* Fix test cases

* Update the celery configuration

* Update and fix celery configuration

* Updated the celery structure

* Update celery tasks and api

* Add celery flower

* Update the import style

* Update contributors
2024-03-22 18:16:15 +08:00

111 lines
3.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
from typing import Any
from cryptography.hazmat.backends.openssl import backend
from cryptography.hazmat.primitives import padding
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from itsdangerous import URLSafeSerializer
from backend.common.log import log
class AESCipher:
def __init__(self, key: bytes | str):
"""
:param key: 密钥16/24/32 bytes 或 16 进制字符串
"""
self.key = key if isinstance(key, bytes) else bytes.fromhex(key)
def encrypt(self, plaintext: bytes | str) -> bytes:
"""
AES 加密
:param plaintext: 加密前的明文
:return:
"""
if not isinstance(plaintext, bytes):
plaintext = str(plaintext).encode('utf-8')
iv = os.urandom(16)
cipher = Cipher(algorithms.AES(self.key), modes.CBC(iv), backend=backend)
encryptor = cipher.encryptor()
padder = padding.PKCS7(cipher.algorithm.block_size).padder() # type: ignore
padded_plaintext = padder.update(plaintext) + padder.finalize()
ciphertext = encryptor.update(padded_plaintext) + encryptor.finalize()
return iv + ciphertext
def decrypt(self, ciphertext: bytes | str) -> str:
"""
AES 解密
:param ciphertext: 解密前的密文, bytes 或 16 进制字符串
:return:
"""
ciphertext = ciphertext if isinstance(ciphertext, bytes) else bytes.fromhex(ciphertext)
iv = ciphertext[:16]
ciphertext = ciphertext[16:]
cipher = Cipher(algorithms.AES(self.key), modes.CBC(iv), backend=backend)
decryptor = cipher.decryptor()
unpadder = padding.PKCS7(cipher.algorithm.block_size).unpadder() # type: ignore
padded_plaintext = decryptor.update(ciphertext) + decryptor.finalize()
plaintext = unpadder.update(padded_plaintext) + unpadder.finalize()
return plaintext.decode('utf-8')
class Md5Cipher:
@staticmethod
def encrypt(plaintext: bytes | str) -> str:
"""
MD5 加密
:param plaintext: 加密前的明文
:return:
"""
import hashlib
md5 = hashlib.md5()
if not isinstance(plaintext, bytes):
plaintext = str(plaintext).encode('utf-8')
md5.update(plaintext)
return md5.hexdigest()
class ItsDCipher:
def __init__(self, key: bytes | str):
"""
:param key: 密钥16/24/32 bytes 或 16 进制字符串
"""
self.key = key if isinstance(key, bytes) else bytes.fromhex(key)
def encrypt(self, plaintext: Any) -> str:
"""
ItsDangerous 加密 (可能失败,如果 plaintext 无法序列化,则会加密为 MD5)
:param plaintext: 加密前的明文
:return:
"""
serializer = URLSafeSerializer(self.key)
try:
ciphertext = serializer.dumps(plaintext)
except Exception as e:
log.error(f'ItsDangerous encrypt failed: {e}')
ciphertext = Md5Cipher.encrypt(plaintext)
return ciphertext
def decrypt(self, ciphertext: str) -> Any:
"""
ItsDangerous 解密 (可能失败,如果 ciphertext 无法反序列化,则解密失败, 返回原始密文)
:param ciphertext: 解密前的密文
:return:
"""
serializer = URLSafeSerializer(self.key)
try:
plaintext = serializer.loads(ciphertext)
except Exception as e:
log.error(f'ItsDangerous decrypt failed: {e}')
plaintext = ciphertext
return plaintext