#!/usr/bin/env python # # Copyright (C) 2020 GNS3 Technologies Inc. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . """ API routes for users. """ from fastapi import APIRouter, Depends, HTTPException, status from fastapi.security import OAuth2PasswordRequestForm from uuid import UUID from typing import List from gns3server import schemas from gns3server.controller.controller_error import ( ControllerBadRequestError, ControllerNotFoundError, ControllerForbiddenError, ) from gns3server.db.repositories.users import UsersRepository from gns3server.services import auth_service from .dependencies.authentication import get_current_active_user from .dependencies.database import get_repository import logging log = logging.getLogger(__name__) router = APIRouter() @router.post("/login", response_model=schemas.Token) async def login( users_repo: UsersRepository = Depends(get_repository(UsersRepository)), form_data: OAuth2PasswordRequestForm = Depends(), ) -> schemas.Token: """ Default user login method using forms (x-www-form-urlencoded). Example: curl http://host:port/v3/users/login -H "Content-Type: application/x-www-form-urlencoded" -d "username=admin&password=admin" """ user = await users_repo.authenticate_user(username=form_data.username, password=form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentication was unsuccessful.", headers={"WWW-Authenticate": "Bearer"}, ) token = schemas.Token(access_token=auth_service.create_access_token(user.username), token_type="bearer") return token @router.post("/authenticate", response_model=schemas.Token) async def authenticate( user_credentials: schemas.Credentials, users_repo: UsersRepository = Depends(get_repository(UsersRepository)), ) -> schemas.Token: """ Alternative authentication method using json. Example: curl http://host:port/v3/users/authenticate -d '{"username": "admin", "password": "admin"}' """ user = await users_repo.authenticate_user(username=user_credentials.username, password=user_credentials.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentication was unsuccessful.", headers={"WWW-Authenticate": "Bearer"}, ) token = schemas.Token(access_token=auth_service.create_access_token(user.username), token_type="bearer") return token @router.get("", response_model=List[schemas.User], dependencies=[Depends(get_current_active_user)]) async def get_users( users_repo: UsersRepository = Depends(get_repository(UsersRepository)) ) -> List[schemas.User]: """ Get all users. """ return await users_repo.get_users() @router.post( "", response_model=schemas.User, dependencies=[Depends(get_current_active_user)], status_code=status.HTTP_201_CREATED ) async def create_user( user_create: schemas.UserCreate, users_repo: UsersRepository = Depends(get_repository(UsersRepository)) ) -> schemas.User: """ Create a new user. """ if await users_repo.get_user_by_username(user_create.username): raise ControllerBadRequestError(f"Username '{user_create.username}' is already registered") if user_create.email and await users_repo.get_user_by_email(user_create.email): raise ControllerBadRequestError(f"Email '{user_create.email}' is already registered") return await users_repo.create_user(user_create) @router.get("/{user_id}", dependencies=[Depends(get_current_active_user)], response_model=schemas.User) async def get_user( user_id: UUID, users_repo: UsersRepository = Depends(get_repository(UsersRepository)), ) -> schemas.User: """ Get an user. """ user = await users_repo.get_user(user_id) if not user: raise ControllerNotFoundError(f"User '{user_id}' not found") return user @router.put("/{user_id}", dependencies=[Depends(get_current_active_user)], response_model=schemas.User) async def update_user( user_id: UUID, user_update: schemas.UserUpdate, users_repo: UsersRepository = Depends(get_repository(UsersRepository)) ) -> schemas.User: """ Update an user. """ user = await users_repo.update_user(user_id, user_update) if not user: raise ControllerNotFoundError(f"User '{user_id}' not found") return user @router.delete( "/{user_id}", dependencies=[Depends(get_current_active_user)], status_code=status.HTTP_204_NO_CONTENT ) async def delete_user( user_id: UUID, users_repo: UsersRepository = Depends(get_repository(UsersRepository)), ) -> None: """ Delete an user. """ user = await users_repo.get_user(user_id) if not user: raise ControllerNotFoundError(f"User '{user_id}' not found") if user.is_superadmin: raise ControllerForbiddenError("The super admin cannot be deleted") success = await users_repo.delete_user(user_id) if not success: raise ControllerNotFoundError(f"User '{user_id}' could not be deleted") @router.get("/me/", response_model=schemas.User) async def get_current_active_user(current_user: schemas.User = Depends(get_current_active_user)) -> schemas.User: """ Get the current active user. """ return current_user @router.get( "/{user_id}/groups", dependencies=[Depends(get_current_active_user)], response_model=List[schemas.UserGroup] ) async def get_user_memberships( user_id: UUID, users_repo: UsersRepository = Depends(get_repository(UsersRepository)) ) -> List[schemas.UserGroup]: """ Get user memberships. """ return await users_repo.get_user_memberships(user_id)