mirror of
https://github.com/fastapi-users/fastapi-users.git
synced 2025-08-26 12:31:25 +08:00
Enable RS256/ES256 and other algorithms for JWT strategy (#943)
* Implement RS256 for JWT auth * Update docs with RS256 example * Added ES256 tests * Format with isort and black * Removed example RSA key pair (so as not to tempt people to use it) * Added pyjwt[crypto] to requirements * Removed pycryptodome by hardcoding example keys * Removed unnecessary Tuple import from typing
This commit is contained in:
@ -15,15 +15,39 @@ def get_jwt_strategy() -> JWTStrategy:
|
||||
|
||||
As you can see, instantiation is quite simple. It accepts the following arguments:
|
||||
|
||||
* `secret` (`Union[str, pydantic.SecretStr]`): A constant secret which is used to encode the token. **Use a strong passphrase and keep it secure.**
|
||||
* `lifetime_seconds` (`Optional[int]`): The lifetime of the token in seconds. Can be set to `None` but in this case the token will be valid **forever**; which may raise serious security concerns.
|
||||
* `token_audience` (`Optional[List[str]]`): A list of valid audiences for the JWT token. Defaults to `["fastapi-users:auth"]`.
|
||||
- `secret` (`Union[str, pydantic.SecretStr]`): A constant secret which is used to encode the token. **Use a strong passphrase and keep it secure.**
|
||||
- `lifetime_seconds` (`Optional[int]`): The lifetime of the token in seconds. Can be set to `None` but in this case the token will be valid **forever**; which may raise serious security concerns.
|
||||
- `token_audience` (`Optional[List[str]]`): A list of valid audiences for the JWT token. Defaults to `["fastapi-users:auth"]`.
|
||||
- `algorithm` (`Optional[str]`): The JWT encryption algorithm. See [RFC 7519, section 8](https://datatracker.ietf.org/doc/html/rfc7519#section-8). Defaults to `"HS256"`.
|
||||
- `public_key` (`Optional[Union[str, pydantic.SecretStr]]`): If the JWT encryption algorithm requires a key pair instead of a simple secret, the key to **decrypt** the JWT may be provided here. The `secret` parameter will always be used to **encrypt** the JWT.
|
||||
|
||||
!!! tip "Why it's inside a function?"
|
||||
To allow strategies to be instantiated dynamically with other dependencies, they have to be provided as a callable to the authentication backend.
|
||||
|
||||
For `JWTStrategy`, since it doesn't require dependencies, it can be as simple as the function above.
|
||||
|
||||
## RS256 example
|
||||
|
||||
```py
|
||||
from fastapi_users.authentication import JWTStrategy
|
||||
|
||||
PUBLIC_KEY = """-----BEGIN PUBLIC KEY-----
|
||||
# Your RSA public key in PEM format goes here
|
||||
-----END PUBLIC KEY-----"""
|
||||
|
||||
PRIVATE_KEY = """-----BEGIN RSA PRIVATE KEY-----
|
||||
# Your RSA private key in PEM format goes here
|
||||
-----END RSA PRIVATE KEY-----"""
|
||||
|
||||
def get_jwt_strategy() -> JWTStrategy:
|
||||
return JWTStrategy(
|
||||
secret=PRIVATE_KEY,
|
||||
lifetime_seconds=3600,
|
||||
algorithm="RS256",
|
||||
public_key=PUBLIC_KEY,
|
||||
)
|
||||
```
|
||||
|
||||
## Logout
|
||||
|
||||
On logout, this strategy **won't do anything**. Indeed, a JWT can't be invalidated on the server-side: it's valid until it expires.
|
||||
|
@ -18,10 +18,22 @@ class JWTStrategy(Strategy, Generic[models.UC, models.UD]):
|
||||
secret: SecretType,
|
||||
lifetime_seconds: Optional[int],
|
||||
token_audience: List[str] = ["fastapi-users:auth"],
|
||||
algorithm: str = "HS256",
|
||||
public_key: Optional[SecretType] = None,
|
||||
):
|
||||
self.secret = secret
|
||||
self.lifetime_seconds = lifetime_seconds
|
||||
self.token_audience = token_audience
|
||||
self.algorithm = algorithm
|
||||
self.public_key = public_key
|
||||
|
||||
@property
|
||||
def encode_key(self) -> SecretType:
|
||||
return self.secret
|
||||
|
||||
@property
|
||||
def decode_key(self) -> SecretType:
|
||||
return self.public_key or self.secret
|
||||
|
||||
async def read_token(
|
||||
self, token: Optional[str], user_manager: BaseUserManager[models.UC, models.UD]
|
||||
@ -30,7 +42,9 @@ class JWTStrategy(Strategy, Generic[models.UC, models.UD]):
|
||||
return None
|
||||
|
||||
try:
|
||||
data = decode_jwt(token, self.secret, self.token_audience)
|
||||
data = decode_jwt(
|
||||
token, self.decode_key, self.token_audience, algorithms=[self.algorithm]
|
||||
)
|
||||
user_id = data.get("user_id")
|
||||
if user_id is None:
|
||||
return None
|
||||
@ -47,7 +61,9 @@ class JWTStrategy(Strategy, Generic[models.UC, models.UD]):
|
||||
|
||||
async def write_token(self, user: models.UD) -> str:
|
||||
data = {"user_id": str(user.id), "aud": self.token_audience}
|
||||
return generate_jwt(data, self.secret, self.lifetime_seconds)
|
||||
return generate_jwt(
|
||||
data, self.encode_key, self.lifetime_seconds, algorithm=self.algorithm
|
||||
)
|
||||
|
||||
async def destroy_token(self, token: str, user: models.UD) -> None:
|
||||
raise StrategyDestroyNotSupportedError(
|
||||
|
@ -73,7 +73,7 @@ dependencies = [
|
||||
"fastapi >=0.65.2,<0.76.0",
|
||||
"passlib[bcrypt] ==1.7.4",
|
||||
"email-validator >=1.1.0,<1.2",
|
||||
"pyjwt ==2.3.0",
|
||||
"pyjwt[crypto] ==2.3.0",
|
||||
"python-multipart ==0.0.5",
|
||||
"makefun >=1.11.2,<1.14",
|
||||
]
|
||||
|
@ -1,7 +1,7 @@
|
||||
fastapi >=0.65.2,<0.76.0
|
||||
passlib[bcrypt] ==1.7.4
|
||||
email-validator >=1.1.2,<1.2
|
||||
pyjwt ==2.3.0
|
||||
pyjwt[crypto] ==2.3.0
|
||||
python-multipart ==0.0.5
|
||||
makefun >=1.11.2,<1.14
|
||||
|
||||
|
@ -8,23 +8,85 @@ from fastapi_users.jwt import SecretType, decode_jwt, generate_jwt
|
||||
|
||||
LIFETIME = 3600
|
||||
|
||||
ECC_PRIVATE_KEY = """-----BEGIN PRIVATE KEY-----
|
||||
MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgewlS46hocOLtT9Px
|
||||
M16Y5m68xdRXMq7oSNOYhGc2kIKhRANCAATJ2SfW2ExzQCmMftxII1xLk2Ze+0WA
|
||||
6ZJQA3kAZTdO8uXmCSDkTgizr39VTKSeHgeaR/cOq4/Jr5YsZrjsu0t8
|
||||
-----END PRIVATE KEY-----"""
|
||||
|
||||
@pytest.fixture
|
||||
def jwt_strategy(secret: SecretType):
|
||||
return JWTStrategy(secret, LIFETIME)
|
||||
ECC_PUBLIC_KEY = """-----BEGIN PUBLIC KEY-----
|
||||
MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEydkn1thMc0ApjH7cSCNcS5NmXvtF
|
||||
gOmSUAN5AGU3TvLl5gkg5E4Is69/VUyknh4Hmkf3DquPya+WLGa47LtLfA==
|
||||
-----END PUBLIC KEY-----"""
|
||||
|
||||
RSA_PRIVATE_KEY = """-----BEGIN RSA PRIVATE KEY-----
|
||||
MIIEogIBAAKCAQEAvwYzu1C2Oihk533SQ1o1zr3BscLjiw5wkCr9yYYYlK+lb8Ra
|
||||
efln+JauEGhENhlG0JIlKV9KzSisGfcIZAVacdaK0PFAXtKEgZRZL7gVr3GGBPEt
|
||||
y0jdQl52yKhnb1Bf5IrHBeu9jSW9WgyYDpWwop1+c9OF9QL9miO/qzkVtdILNkp4
|
||||
1kvP/PldWQj8vuaxy6dGx9jRHWLFDjRERKkrUraQHs6Fmey1pDTEyN3TYWsKmi7S
|
||||
mpXzgLuClEtSNgAhlwvvDyCZ8SP/SMYZIjIckZlVk+qtvrqKQdqNxJbGLrta5gtB
|
||||
fWDLCDRRFsIrvfZaPsSLhuKQBv9L7ZJazTtSowIDAQABAoIBABxKZNr3ByXx2Y/X
|
||||
OI61C4cE32zeOijcCJuxYki4TWen4857vBKYd2d/mWPgrUl90NkO6+YGsONVhLeL
|
||||
uHhnuo9lgMWVFT113B38xICmuL91Bq4wseGLdwlfSCRLnJYFx03np7YexcHjtvlh
|
||||
KBvw22oZ/SJWT16MBNcROE+5cpestErq61U6G2HpubrVIQJuNe7U9mEGZdyN1eer
|
||||
zRP3eh5on7J25D3/Wtwsf8oOSWCljZ0uGLAqFVVLvxdf/By6TnCtRckCyODQ8/L0
|
||||
rHq7BC5Kc5awbN5DatJEsJfnbOTNXD4dIEYjhXZ0YhtbOeh6pRN3Z7GuIC7tL8Xc
|
||||
JTpKhvECgYEAzu9gnDhqiYVdjYpVWoNh9QPnxSCxvWHh38jLhLffRvXymNCXKsWQ
|
||||
CNtoYyBIMve+TCLNseHN5GfTtGAh0aoWNHZC8FQwIep03y9E439EKXGMKOUuTEyL
|
||||
NlIKuzOl6eIJbRaeTQ5XrIN7DhdNgKFHVC55Z+aultDrl3k3H8Xf4scCgYEA7FEP
|
||||
/nqauzRScdfpExQHVskLEQe4ENvZB7r/phPQ9VnluUtnp+2I6QnN6Ye3+30rBdGk
|
||||
z4gE8l1LfpK3T10qW2d6mFVTfDQ2aUR68nKR+xveEjlq5GiGIJDSA+zimMXAaGrL
|
||||
KFwn/S43X86FQGegyu1OlxGfRbmZ/Xyj8gKQNUUCgYAiDDLKIWIuFFprMmqOxPU2
|
||||
GhllTtbPwS4n4uLRiGtdQpRS3mcw62aifm8zeBlJAYg3ALb1YKC+xvKHSBXoaGLU
|
||||
6OxknIV63xexrRZZlBQD+aHFDMhMV3/ERUVsvbe7vqwsXb9YEFcOlGeHzv+6fU6+
|
||||
JBNnrAXn3KIWvyP5v1Xx+wKBgDD6cBUvNgicxIWh2UXB/e9nxapm7ihYWHf4sump
|
||||
68IeOrWXwkkUuy6JgKrpHSG7hII1PDJjH5tX6MC4CdQiHBhLryYJcT8p1ykkL1M2
|
||||
mbjwwqsGSXhDjaEMQurbWu+M9N7vW2HnD8ayoHlz5Tw+/h1w57v5xAgAesEF5zjO
|
||||
fTL9AoGAEFTAP7v8Kw7+iSjpw5uEzEPJTpTieT7MHoyAlcCkJZOqlrgQDESuliwr
|
||||
gE2YhBBk7IpPKNLttkG0p5JMCxxSoQPz0wsy/VJhuwLPtgH12Df6GFblp7B0RtgX
|
||||
DCGBlAaf+d7Rd/PPf7p5lFSY+e6jOdMk/BNjpyFI73R775qjr5o=
|
||||
-----END RSA PRIVATE KEY-----"""
|
||||
|
||||
RSA_PUBLIC_KEY = """-----BEGIN PUBLIC KEY-----
|
||||
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAvwYzu1C2Oihk533SQ1o1
|
||||
zr3BscLjiw5wkCr9yYYYlK+lb8Raefln+JauEGhENhlG0JIlKV9KzSisGfcIZAVa
|
||||
cdaK0PFAXtKEgZRZL7gVr3GGBPEty0jdQl52yKhnb1Bf5IrHBeu9jSW9WgyYDpWw
|
||||
op1+c9OF9QL9miO/qzkVtdILNkp41kvP/PldWQj8vuaxy6dGx9jRHWLFDjRERKkr
|
||||
UraQHs6Fmey1pDTEyN3TYWsKmi7SmpXzgLuClEtSNgAhlwvvDyCZ8SP/SMYZIjIc
|
||||
kZlVk+qtvrqKQdqNxJbGLrta5gtBfWDLCDRRFsIrvfZaPsSLhuKQBv9L7ZJazTtS
|
||||
owIDAQAB
|
||||
-----END PUBLIC KEY-----"""
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def token(secret):
|
||||
def jwt_strategy(request, secret: SecretType):
|
||||
if request.param == "HS256":
|
||||
return JWTStrategy(secret, LIFETIME)
|
||||
elif request.param == "RS256":
|
||||
return JWTStrategy(
|
||||
RSA_PRIVATE_KEY, LIFETIME, algorithm="RS256", public_key=RSA_PUBLIC_KEY
|
||||
)
|
||||
elif request.param == "ES256":
|
||||
return JWTStrategy(
|
||||
ECC_PRIVATE_KEY, LIFETIME, algorithm="ES256", public_key=ECC_PUBLIC_KEY
|
||||
)
|
||||
raise ValueError(f"Unrecognized algorithm: {request.param}")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def token(jwt_strategy: JWTStrategy):
|
||||
def _token(user_id=None, lifetime=LIFETIME):
|
||||
data = {"aud": "fastapi-users:auth"}
|
||||
if user_id is not None:
|
||||
data["user_id"] = str(user_id)
|
||||
return generate_jwt(data, secret, lifetime)
|
||||
return generate_jwt(
|
||||
data, jwt_strategy.encode_key, lifetime, algorithm=jwt_strategy.algorithm
|
||||
)
|
||||
|
||||
return _token
|
||||
|
||||
|
||||
@pytest.mark.parametrize("jwt_strategy", ["HS256", "RS256", "ES256"], indirect=True)
|
||||
@pytest.mark.authentication
|
||||
class TestReadToken:
|
||||
@pytest.mark.asyncio
|
||||
@ -69,17 +131,22 @@ class TestReadToken:
|
||||
assert authenticated_user.id == user.id
|
||||
|
||||
|
||||
@pytest.mark.parametrize("jwt_strategy", ["HS256", "RS256", "ES256"], indirect=True)
|
||||
@pytest.mark.authentication
|
||||
@pytest.mark.asyncio
|
||||
async def test_write_token(jwt_strategy: JWTStrategy, user):
|
||||
token = await jwt_strategy.write_token(user)
|
||||
|
||||
decoded = decode_jwt(
|
||||
token, jwt_strategy.secret, audience=jwt_strategy.token_audience
|
||||
token,
|
||||
jwt_strategy.decode_key,
|
||||
audience=jwt_strategy.token_audience,
|
||||
algorithms=[jwt_strategy.algorithm],
|
||||
)
|
||||
assert decoded["user_id"] == str(user.id)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("jwt_strategy", ["HS256", "RS256", "ES256"], indirect=True)
|
||||
@pytest.mark.authentication
|
||||
@pytest.mark.asyncio
|
||||
async def test_destroy_token(jwt_strategy: JWTStrategy, user):
|
||||
|
Reference in New Issue
Block a user