Merge pull request #2469 from GNS3/joserfc-migration

Replace python-jose library by joserfc
This commit is contained in:
Jeremy Grossmann 2024-12-30 16:02:17 +07:00 committed by GitHub
commit 96c6805ace
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 29 additions and 17 deletions

View File

@ -14,8 +14,9 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from joserfc import jwt
from jose import JWTError, jwt from joserfc.jwk import OctKey
from joserfc.errors import JoseError
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
import bcrypt import bcrypt
@ -56,7 +57,8 @@ class AuthService:
secret_key = DEFAULT_JWT_SECRET_KEY secret_key = DEFAULT_JWT_SECRET_KEY
log.error("A JWT secret key must be configured to secure the server, using an unsecured default key!") log.error("A JWT secret key must be configured to secure the server, using an unsecured default key!")
algorithm = Config.instance().settings.Controller.jwt_algorithm algorithm = Config.instance().settings.Controller.jwt_algorithm
encoded_jwt = jwt.encode(to_encode, secret_key, algorithm=algorithm) key = OctKey.import_key(secret_key)
encoded_jwt = jwt.encode({"alg": algorithm}, to_encode, key)
return encoded_jwt return encoded_jwt
def get_username_from_token(self, token: str, secret_key: str = None) -> Optional[str]: def get_username_from_token(self, token: str, secret_key: str = None) -> Optional[str]:
@ -73,11 +75,12 @@ class AuthService:
secret_key = DEFAULT_JWT_SECRET_KEY secret_key = DEFAULT_JWT_SECRET_KEY
log.error("A JWT secret key must be configured to secure the server, using an unsecured default key!") log.error("A JWT secret key must be configured to secure the server, using an unsecured default key!")
algorithm = Config.instance().settings.Controller.jwt_algorithm algorithm = Config.instance().settings.Controller.jwt_algorithm
payload = jwt.decode(token, secret_key, algorithms=[algorithm]) key = OctKey.import_key(secret_key)
username: str = payload.get("sub") payload = jwt.decode(token, key, algorithms=[algorithm])
username: str = payload.claims.get("sub")
if username is None: if username is None:
raise credentials_exception raise credentials_exception
token_data = TokenData(username=username) token_data = TokenData(username=username)
except (JWTError, ValidationError): except (JoseError, ValidationError, ValueError):
raise credentials_exception raise credentials_exception
return token_data.username return token_data.username

View File

@ -16,7 +16,7 @@ sqlalchemy==2.0.36
aiosqlite==0.20.0 aiosqlite==0.20.0
alembic==1.14.0 alembic==1.14.0
bcrypt==4.2.1 bcrypt==4.2.1
python-jose[cryptography]==3.3.0 joserfc==1.0.1
email-validator==2.2.0 email-validator==2.2.0
watchfiles==1.0.3 watchfiles==1.0.3
zstandard==0.23.0 zstandard==0.23.0

View File

@ -21,7 +21,8 @@ from typing import Optional
from fastapi import FastAPI, HTTPException, status from fastapi import FastAPI, HTTPException, status
from sqlalchemy import update from sqlalchemy import update
from httpx import AsyncClient from httpx import AsyncClient
from jose import jwt from joserfc import jwt
from joserfc.jwk import OctKey
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from gns3server.db.repositories.users import UsersRepository from gns3server.db.repositories.users import UsersRepository
@ -166,16 +167,23 @@ class TestAuthTokens:
jwt_secret = config.settings.Controller.jwt_secret_key jwt_secret = config.settings.Controller.jwt_secret_key
token = auth_service.create_access_token(test_user.username) token = auth_service.create_access_token(test_user.username)
payload = jwt.decode(token, jwt_secret, algorithms=["HS256"]) key = OctKey.import_key(jwt_secret)
username = payload.get("sub") payload = jwt.decode(token, key, algorithms=["HS256"])
username = payload.claims.get("sub")
assert username == test_user.username assert username == test_user.username
async def test_token_missing_user_is_invalid(self, app: FastAPI, client: AsyncClient, config: Config) -> None: async def test_decode_token_with_wrong_algorithm(
self,
app: FastAPI,
client: AsyncClient,
test_user: User,
config: Config
) -> None:
jwt_secret = config.settings.Controller.jwt_secret_key jwt_secret = config.settings.Controller.jwt_secret_key
token = auth_service.create_access_token(None) token = auth_service.create_access_token(test_user.username)
with pytest.raises(jwt.JWTError): with pytest.raises(ValueError):
jwt.decode(token, jwt_secret, algorithms=["HS256"]) jwt.decode(token, jwt_secret, algorithms=["ES256"])
async def test_can_retrieve_username_from_token( async def test_can_retrieve_username_from_token(
self, self,
@ -236,9 +244,10 @@ class TestUserLogin:
# check that token exists in response and has user encoded within it # check that token exists in response and has user encoded within it
token = response.json().get("access_token") token = response.json().get("access_token")
payload = jwt.decode(token, jwt_secret, algorithms=["HS256"]) key = OctKey.import_key(jwt_secret)
assert "sub" in payload payload = jwt.decode(token, key, algorithms=["HS256"])
username = payload.get("sub") assert "sub" in payload.claims
username = payload.claims.get("sub")
assert username == test_user.username assert username == test_user.username
# check that token is proper type # check that token is proper type