From d65b49acaafabf19c3783ff5cc1f645441414c47 Mon Sep 17 00:00:00 2001 From: grossmj Date: Thu, 3 Jun 2021 15:40:12 +0930 Subject: [PATCH] Add user permissions + RBAC tests. --- .../controller/dependencies/authentication.py | 10 +- gns3server/api/routes/controller/groups.py | 4 +- gns3server/api/routes/controller/projects.py | 25 ++- gns3server/api/routes/controller/roles.py | 4 +- gns3server/api/routes/controller/templates.py | 43 +++- gns3server/api/routes/controller/users.py | 63 ++++++ gns3server/db/models/permissions.py | 30 +-- gns3server/db/repositories/rbac.py | 126 +++++++---- .../api/routes/controller/test_permissions.py | 2 +- tests/api/routes/controller/test_roles.py | 64 +++--- tests/api/routes/controller/test_users.py | 79 +++++++ tests/controller/test_rbac.py | 203 ++++++++++++++++++ 12 files changed, 554 insertions(+), 99 deletions(-) create mode 100644 tests/controller/test_rbac.py diff --git a/gns3server/api/routes/controller/dependencies/authentication.py b/gns3server/api/routes/controller/dependencies/authentication.py index 36e35d47..0af058d7 100644 --- a/gns3server/api/routes/controller/dependencies/authentication.py +++ b/gns3server/api/routes/controller/dependencies/authentication.py @@ -14,6 +14,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . +import re from fastapi import Request, Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer @@ -60,11 +61,16 @@ async def get_current_active_user( ) # remove the prefix (e.g. "/v3") from URL path - if request.url.path.startswith("/v3"): - path = request.url.path[len("/v3"):] + match = re.search(r"^(/v[0-9]+).*", request.url.path) + if match: + path = request.url.path[len(match.group(1)):] else: path = request.url.path + # special case: always authorize access to the "/users/me" endpoint + if path == "/users/me": + return current_user + authorized = await rbac_repo.check_user_is_authorized(current_user.user_id, request.method, path) if not authorized: raise HTTPException( diff --git a/gns3server/api/routes/controller/groups.py b/gns3server/api/routes/controller/groups.py index 84c980a6..20b17ae4 100644 --- a/gns3server/api/routes/controller/groups.py +++ b/gns3server/api/routes/controller/groups.py @@ -100,7 +100,7 @@ async def update_user_group( raise ControllerNotFoundError(f"User group '{user_group_id}' not found") if user_group.builtin: - raise ControllerForbiddenError(f"User group '{user_group_id}' cannot be updated") + raise ControllerForbiddenError(f"Built-in user group '{user_group_id}' cannot be updated") return await users_repo.update_user_group(user_group_id, user_group_update) @@ -122,7 +122,7 @@ async def delete_user_group( raise ControllerNotFoundError(f"User group '{user_group_id}' not found") if user_group.builtin: - raise ControllerForbiddenError(f"User group '{user_group_id}' cannot be deleted") + raise ControllerForbiddenError(f"Built-in user group '{user_group_id}' cannot be deleted") success = await users_repo.delete_user_group(user_group_id) if not success: diff --git a/gns3server/api/routes/controller/projects.py b/gns3server/api/routes/controller/projects.py index 1d7aa2a5..55252c41 100644 --- a/gns3server/api/routes/controller/projects.py +++ b/gns3server/api/routes/controller/projects.py @@ -70,13 +70,25 @@ CHUNK_SIZE = 1024 * 8 # 8KB @router.get("", response_model=List[schemas.Project], response_model_exclude_unset=True) -def get_projects() -> List[schemas.Project]: +async def get_projects( + current_user: schemas.User = Depends(get_current_active_user), + rbac_repo: RbacRepository = Depends(get_repository(RbacRepository)) +) -> List[schemas.Project]: """ Return all projects. """ controller = Controller.instance() - return [p.asdict() for p in controller.projects.values()] + if current_user.is_superadmin: + return [p.asdict() for p in controller.projects.values()] + else: + user_projects = [] + for project in controller.projects.values(): + authorized = await rbac_repo.check_user_is_authorized( + current_user.user_id, "GET", f"/projects/{project.id}") + if authorized: + user_projects.append(project.asdict()) + return user_projects @router.post( @@ -97,7 +109,7 @@ async def create_project( controller = Controller.instance() project = await controller.add_project(**jsonable_encoder(project_data, exclude_unset=True)) - await rbac_repo.add_permission_to_user(current_user.user_id, f"/projects/{project.id}/*") + await rbac_repo.add_permission_to_user_with_path(current_user.user_id, f"/projects/{project.id}/*") return project.asdict() @@ -135,7 +147,7 @@ async def delete_project( controller = Controller.instance() await project.delete() controller.remove_project(project) - await rbac_repo.delete_all_permissions_matching_path(f"/projects/{project.id}") + await rbac_repo.delete_all_permissions_with_path(f"/projects/{project.id}") @router.get("/{project_id}/stats") @@ -357,7 +369,9 @@ async def import_project( ) async def duplicate_project( project_data: schemas.ProjectDuplicate, - project: Project = Depends(dep_project) + project: Project = Depends(dep_project), + current_user: schemas.User = Depends(get_current_active_user), + rbac_repo: RbacRepository = Depends(get_repository(RbacRepository)) ) -> schemas.Project: """ Duplicate a project. @@ -374,6 +388,7 @@ async def duplicate_project( new_project = await project.duplicate( name=project_data.name, location=location, reset_mac_addresses=reset_mac_addresses ) + await rbac_repo.add_permission_to_user_with_path(current_user.user_id, f"/projects/{new_project.id}/*") return new_project.asdict() diff --git a/gns3server/api/routes/controller/roles.py b/gns3server/api/routes/controller/roles.py index 1d013f97..c96feb64 100644 --- a/gns3server/api/routes/controller/roles.py +++ b/gns3server/api/routes/controller/roles.py @@ -96,7 +96,7 @@ async def update_role( raise ControllerNotFoundError(f"Role '{role_id}' not found") if role.builtin: - raise ControllerForbiddenError(f"Role '{role_id}' cannot be updated") + raise ControllerForbiddenError(f"Built-in role '{role_id}' cannot be updated") return await rbac_repo.update_role(role_id, role_update) @@ -115,7 +115,7 @@ async def delete_role( raise ControllerNotFoundError(f"Role '{role_id}' not found") if role.builtin: - raise ControllerForbiddenError(f"Role '{role_id}' cannot be deleted") + raise ControllerForbiddenError(f"Built-in role '{role_id}' cannot be deleted") success = await rbac_repo.delete_role(role_id) if not success: diff --git a/gns3server/api/routes/controller/templates.py b/gns3server/api/routes/controller/templates.py index ee54fdc7..a346545a 100644 --- a/gns3server/api/routes/controller/templates.py +++ b/gns3server/api/routes/controller/templates.py @@ -33,6 +33,9 @@ from gns3server import schemas from gns3server.controller import Controller from gns3server.db.repositories.templates import TemplatesRepository from gns3server.services.templates import TemplatesService +from gns3server.db.repositories.rbac import RbacRepository + +from .dependencies.authentication import get_current_active_user from .dependencies.database import get_repository responses = {404: {"model": schemas.ErrorMessage, "description": "Could not find template"}} @@ -44,12 +47,17 @@ router = APIRouter(responses=responses) async def create_template( template_create: schemas.TemplateCreate, templates_repo: TemplatesRepository = Depends(get_repository(TemplatesRepository)), + current_user: schemas.User = Depends(get_current_active_user), + rbac_repo: RbacRepository = Depends(get_repository(RbacRepository)) ) -> schemas.Template: """ Create a new template. """ - return await TemplatesService(templates_repo).create_template(template_create) + template = await TemplatesService(templates_repo).create_template(template_create) + template_id = template.get("template_id") + await rbac_repo.add_permission_to_user_with_path(current_user.user_id, f"/templates/{template_id}/*") + return template @router.get("/templates/{template_id}", response_model=schemas.Template, response_model_exclude_unset=True) @@ -92,35 +100,58 @@ async def update_template( status_code=status.HTTP_204_NO_CONTENT, ) async def delete_template( - template_id: UUID, templates_repo: TemplatesRepository = Depends(get_repository(TemplatesRepository)) + template_id: UUID, + templates_repo: TemplatesRepository = Depends(get_repository(TemplatesRepository)), + rbac_repo: RbacRepository = Depends(get_repository(RbacRepository)) ) -> None: """ Delete a template. """ await TemplatesService(templates_repo).delete_template(template_id) + await rbac_repo.delete_all_permissions_with_path(f"/templates/{template_id}") @router.get("/templates", response_model=List[schemas.Template], response_model_exclude_unset=True) async def get_templates( - templates_repo: TemplatesRepository = Depends(get_repository(TemplatesRepository)), + templates_repo: TemplatesRepository = Depends(get_repository(TemplatesRepository)), + current_user: schemas.User = Depends(get_current_active_user), + rbac_repo: RbacRepository = Depends(get_repository(RbacRepository)) ) -> List[schemas.Template]: """ Return all templates. """ - return await TemplatesService(templates_repo).get_templates() + templates = await TemplatesService(templates_repo).get_templates() + if current_user.is_superadmin: + return templates + else: + user_templates = [] + for template in templates: + if template.get("builtin") is True: + user_templates.append(template) + continue + template_id = template.get("template_id") + authorized = await rbac_repo.check_user_is_authorized( + current_user.user_id, "GET", f"/templates/{template_id}") + if authorized: + user_templates.append(template) + return user_templates @router.post("/templates/{template_id}/duplicate", response_model=schemas.Template, status_code=status.HTTP_201_CREATED) async def duplicate_template( - template_id: UUID, templates_repo: TemplatesRepository = Depends(get_repository(TemplatesRepository)) + template_id: UUID, templates_repo: TemplatesRepository = Depends(get_repository(TemplatesRepository)), + current_user: schemas.User = Depends(get_current_active_user), + rbac_repo: RbacRepository = Depends(get_repository(RbacRepository)) ) -> schemas.Template: """ Duplicate a template. """ - return await TemplatesService(templates_repo).duplicate_template(template_id) + template = await TemplatesService(templates_repo).duplicate_template(template_id) + await rbac_repo.add_permission_to_user_with_path(current_user.user_id, f"/templates/{template_id}/*") + return template @router.post( diff --git a/gns3server/api/routes/controller/users.py b/gns3server/api/routes/controller/users.py index df279e20..edf53b9a 100644 --- a/gns3server/api/routes/controller/users.py +++ b/gns3server/api/routes/controller/users.py @@ -32,6 +32,7 @@ from gns3server.controller.controller_error import ( ) from gns3server.db.repositories.users import UsersRepository +from gns3server.db.repositories.rbac import RbacRepository from gns3server.services import auth_service from .dependencies.authentication import get_current_active_user @@ -210,3 +211,65 @@ async def get_user_memberships( """ return await users_repo.get_user_memberships(user_id) + + +@router.get( + "/{user_id}/permissions", + dependencies=[Depends(get_current_active_user)], + response_model=List[schemas.Permission] +) +async def get_user_permissions( + user_id: UUID, + rbac_repo: RbacRepository = Depends(get_repository(RbacRepository)) +) -> List[schemas.Permission]: + """ + Get user permissions. + """ + + return await rbac_repo.get_user_permissions(user_id) + + +@router.put( + "/{user_id}/permissions/{permission_id}", + dependencies=[Depends(get_current_active_user)], + status_code=status.HTTP_204_NO_CONTENT +) +async def add_permission_to_user( + user_id: UUID, + permission_id: UUID, + rbac_repo: RbacRepository = Depends(get_repository(RbacRepository)) +) -> None: + """ + Add a permission to an user. + """ + + permission = await rbac_repo.get_permission(permission_id) + if not permission: + raise ControllerNotFoundError(f"Permission '{permission_id}' not found") + + user = await rbac_repo.add_permission_to_user(user_id, permission) + if not user: + raise ControllerNotFoundError(f"User '{user_id}' not found") + + +@router.delete( + "/{user_id}/permissions/{permission_id}", + dependencies=[Depends(get_current_active_user)], + status_code=status.HTTP_204_NO_CONTENT +) +async def remove_permission_from_user( + user_id: UUID, + permission_id: UUID, + rbac_repo: RbacRepository = Depends(get_repository(RbacRepository)), +) -> None: + """ + Remove permission from an user. + """ + + permission = await rbac_repo.get_permission(permission_id) + if not permission: + raise ControllerNotFoundError(f"Permission '{permission_id}' not found") + + user = await rbac_repo.remove_permission_from_user(user_id, permission) + if not user: + raise ControllerNotFoundError(f"User '{user_id}' not found") diff --git a/gns3server/db/models/permissions.py b/gns3server/db/models/permissions.py index 534f2274..4779b6af 100644 --- a/gns3server/db/models/permissions.py +++ b/gns3server/db/models/permissions.py @@ -58,21 +58,27 @@ def create_default_roles(target, connection, **kw): "action": "ALLOW" }, { - "description": "Allow access to the logged in user", - "methods": ["GET"], - "path": "/users/me", - "action": "ALLOW" - }, - { - "description": "Allow to create a project or list projects", - "methods": ["GET", "POST"], + "description": "Allow to create and list projects", + "methods": ["GET", "HEAD", "POST"], "path": "/projects", "action": "ALLOW" }, { - "description": "Allow to access to all symbol endpoints", - "methods": ["GET", "HEAD", "POST", "PUT", "DELETE", "PATCH"], - "path": "/symbols", + "description": "Allow to create and list templates", + "methods": ["GET", "HEAD", "POST"], + "path": "/templates", + "action": "ALLOW" + }, + { + "description": "Allow to list computes", + "methods": ["GET"], + "path": "/computes/*", + "action": "ALLOW" + }, + { + "description": "Allow access to all symbol endpoints", + "methods": ["GET", "HEAD", "POST"], + "path": "/symbols/*", "action": "ALLOW" }, ] @@ -106,7 +112,7 @@ def add_permissions_to_role(target, connection, **kw): role_id = result.first().role_id # add minimum required paths to the "User" role - for path in ("/projects", "/symbols", "/users/me"): + for path in ("/projects", "/templates", "/computes/*", "/symbols/*"): stmt = permissions_table.select().where(permissions_table.c.path == path) result = connection.execute(stmt) permission_id = result.first().permission_id diff --git a/gns3server/db/repositories/rbac.py b/gns3server/db/repositories/rbac.py index 39d52320..6e1096c9 100644 --- a/gns3server/db/repositories/rbac.py +++ b/gns3server/db/repositories/rbac.py @@ -216,6 +216,7 @@ class RbacRepository(BaseRepository): """ db_permission = models.Permission( + description=permission_create.description, methods=permission_create.methods, path=permission_create.path, action=permission_create.action, @@ -270,60 +271,76 @@ class RbacRepository(BaseRepository): log.debug(f"RBAC: checking permission {permission.methods} {permission.path} {permission.action}") if method not in permission.methods: continue - if permission.path.endswith("*") and path.startswith(permission.path[:-1]): + if permission.path.endswith("/*") and path.startswith(permission.path[:-2]): return permission elif permission.path == path: return permission - async def check_user_is_authorized(self, user_id: UUID, method: str, path: str) -> bool: + async def get_user_permissions(self, user_id: UUID): """ - Check if an user is authorized to access a resource. + Get all permissions from an user. """ - query = select(models.Permission).\ - join(models.Permission.roles). \ - join(models.Role.groups). \ - join(models.UserGroup.users). \ - filter(models.User.user_id == user_id).\ - order_by(models.Permission.path) - - result = await self._db_session.execute(query) - permissions = result.scalars().all() - log.debug(f"RBAC: checking authorization for '{user_id}' on {method} '{path}'") - matched_permission = self._match_permission(permissions, method, path) - if matched_permission: - log.debug(f"RBAC: matched role permission {matched_permission.methods} " - f"{matched_permission.path} {matched_permission.action}") - if matched_permission.action == "DENY": - return False - return True - - log.debug(f"RBAC: could not find a role permission, checking user permissions...") query = select(models.Permission).\ join(models.User.permissions). \ filter(models.User.user_id == user_id).\ order_by(models.Permission.path) result = await self._db_session.execute(query) - permissions = result.scalars().all() - matched_permission = self._match_permission(permissions, method, path) - if matched_permission: - log.debug(f"RBAC: matched user permission {matched_permission.methods} " - f"{matched_permission.path} {matched_permission.action}") - if matched_permission.action == "DENY": - return False - return True + return result.scalars().all() - return False - - async def add_permission_to_user(self, user_id: UUID, path: str) -> Union[None, models.User]: + async def add_permission_to_user( + self, + user_id: UUID, + permission: models.Permission + ) -> Union[None, models.User]: """ Add a permission to an user. """ - # Create a new permission with full rights + query = select(models.User).\ + options(selectinload(models.User.permissions)).\ + where(models.User.user_id == user_id) + result = await self._db_session.execute(query) + user_db = result.scalars().first() + if not user_db: + return None + + user_db.permissions.append(permission) + await self._db_session.commit() + await self._db_session.refresh(user_db) + return user_db + + async def remove_permission_from_user( + self, + user_id: UUID, + permission: models.Permission + ) -> Union[None, models.User]: + """ + Remove a permission from a role. + """ + + query = select(models.User).\ + options(selectinload(models.User.permissions)).\ + where(models.User.user_id == user_id) + result = await self._db_session.execute(query) + user_db = result.scalars().first() + if not user_db: + return None + + user_db.permissions.remove(permission) + await self._db_session.commit() + await self._db_session.refresh(user_db) + return user_db + + async def add_permission_to_user_with_path(self, user_id: UUID, path: str) -> Union[None, models.User]: + """ + Add a permission to an user. + """ + + # Create a new permission with full rights on path new_permission = schemas.PermissionCreate( - description=f"Allow access to project {path}", + description=f"Allow access to {path}", methods=[HTTPMethods.get, HTTPMethods.head, HTTPMethods.post, HTTPMethods.put, HTTPMethods.delete], path=path, action=PermissionAction.allow @@ -345,9 +362,9 @@ class RbacRepository(BaseRepository): await self._db_session.refresh(user_db) return user_db - async def delete_all_permissions_matching_path(self, path: str) -> None: + async def delete_all_permissions_with_path(self, path: str) -> None: """ - Delete all permissions matching with path. + Delete all permissions with path. """ query = delete(models.Permission).\ @@ -355,3 +372,38 @@ class RbacRepository(BaseRepository): execution_options(synchronize_session=False) result = await self._db_session.execute(query) log.debug(f"{result.rowcount} permission(s) have been deleted") + + async def check_user_is_authorized(self, user_id: UUID, method: str, path: str) -> bool: + """ + Check if an user is authorized to access a resource. + """ + + query = select(models.Permission).\ + join(models.Permission.roles). \ + join(models.Role.groups). \ + join(models.UserGroup.users). \ + filter(models.User.user_id == user_id).\ + order_by(models.Permission.path) + + result = await self._db_session.execute(query) + permissions = result.scalars().all() + log.debug(f"RBAC: checking authorization for user '{user_id}' on {method} '{path}'") + matched_permission = self._match_permission(permissions, method, path) + if matched_permission: + log.debug(f"RBAC: matched role permission {matched_permission.methods} " + f"{matched_permission.path} {matched_permission.action}") + if matched_permission.action == "DENY": + return False + return True + + log.debug(f"RBAC: could not find a role permission, checking user permissions...") + permissions = await self.get_user_permissions(user_id) + matched_permission = self._match_permission(permissions, method, path) + if matched_permission: + log.debug(f"RBAC: matched user permission {matched_permission.methods} " + f"{matched_permission.path} {matched_permission.action}") + if matched_permission.action == "DENY": + return False + return True + + return False diff --git a/tests/api/routes/controller/test_permissions.py b/tests/api/routes/controller/test_permissions.py index fc15087f..1bd4e0ff 100644 --- a/tests/api/routes/controller/test_permissions.py +++ b/tests/api/routes/controller/test_permissions.py @@ -50,7 +50,7 @@ class TestPermissionRoutes: response = await client.get(app.url_path_for("get_permissions")) assert response.status_code == status.HTTP_200_OK - assert len(response.json()) == 5 # 4 default permissions + 1 custom permission + assert len(response.json()) == 6 # 5 default permissions + 1 custom permission async def test_update_permission(self, app: FastAPI, client: AsyncClient, db_session: AsyncSession) -> None: diff --git a/tests/api/routes/controller/test_roles.py b/tests/api/routes/controller/test_roles.py index 93b21004..20646d92 100644 --- a/tests/api/routes/controller/test_roles.py +++ b/tests/api/routes/controller/test_roles.py @@ -64,21 +64,21 @@ class TestRolesRoutes: updated_role_in_db = await rbac_repo.get_role(role_in_db.role_id) assert updated_role_in_db.name == "role42" - # async def test_cannot_update_admin_group( - # self, - # app: FastAPI, - # client: AsyncClient, - # db_session: AsyncSession - # ) -> None: - # - # user_repo = UsersRepository(db_session) - # group_in_db = await user_repo.get_user_group_by_name("Administrators") - # update_group = {"name": "Hackers"} - # response = await client.put( - # app.url_path_for("update_user_group", user_group_id=group_in_db.user_group_id), - # json=update_group - # ) - # assert response.status_code == status.HTTP_403_FORBIDDEN + async def test_cannot_update_builtin_user_role( + self, + app: FastAPI, + client: AsyncClient, + db_session: AsyncSession + ) -> None: + + rbac_repo = RbacRepository(db_session) + role_in_db = await rbac_repo.get_role_by_name("User") + update_role = {"name": "Hackers"} + response = await client.put( + app.url_path_for("update_role", role_id=role_in_db.role_id), + json=update_role + ) + assert response.status_code == status.HTTP_403_FORBIDDEN async def test_delete_role( self, @@ -92,29 +92,29 @@ class TestRolesRoutes: response = await client.delete(app.url_path_for("delete_role", role_id=role_in_db.role_id)) assert response.status_code == status.HTTP_204_NO_CONTENT - # async def test_cannot_delete_admin_group( - # self, - # app: FastAPI, - # client: AsyncClient, - # db_session: AsyncSession - # ) -> None: - # - # user_repo = UsersRepository(db_session) - # group_in_db = await user_repo.get_user_group_by_name("Administrators") - # response = await client.delete(app.url_path_for("delete_user_group", user_group_id=group_in_db.user_group_id)) - # assert response.status_code == status.HTTP_403_FORBIDDEN + async def test_cannot_delete_builtin_administrator_role( + self, + app: FastAPI, + client: AsyncClient, + db_session: AsyncSession + ) -> None: + + rbac_repo = RbacRepository(db_session) + role_in_db = await rbac_repo.get_role_by_name("Administrator") + response = await client.delete(app.url_path_for("delete_role", role_id=role_in_db.role_id)) + assert response.status_code == status.HTTP_403_FORBIDDEN @pytest.fixture async def test_permission(db_session: AsyncSession) -> Permission: new_permission = schemas.PermissionCreate( - methods=[HTTPMethods.get, HTTPMethods.post], - path="/templates", + methods=[HTTPMethods.get], + path="/statistics", action=PermissionAction.allow ) rbac_repo = RbacRepository(db_session) - existing_permission = await rbac_repo.get_permission_by_path("/templates") + existing_permission = await rbac_repo.get_permission_by_path("/statistics") if existing_permission: return existing_permission return await rbac_repo.create_permission(new_permission) @@ -142,7 +142,7 @@ class TestRolesPermissionsRoutes: ) assert response.status_code == status.HTTP_204_NO_CONTENT permissions = await rbac_repo.get_role_permissions(role_in_db.role_id) - assert len(permissions) == 4 # 3 default + 1 custom permissions + assert len(permissions) == 5 # 4 default permissions + 1 custom permission async def test_get_role_permissions( self, @@ -160,7 +160,7 @@ class TestRolesPermissionsRoutes: role_id=role_in_db.role_id) ) assert response.status_code == status.HTTP_200_OK - assert len(response.json()) == 4 # 3 default + 1 custom permissions + assert len(response.json()) == 5 # 4 default permissions + 1 custom permission async def test_remove_role_from_group( self, @@ -182,4 +182,4 @@ class TestRolesPermissionsRoutes: ) assert response.status_code == status.HTTP_204_NO_CONTENT permissions = await rbac_repo.get_role_permissions(role_in_db.role_id) - assert len(permissions) == 3 # 3 default permissions + assert len(permissions) == 4 # 4 default permissions diff --git a/tests/api/routes/controller/test_users.py b/tests/api/routes/controller/test_users.py index 36b16c10..ce0d8801 100644 --- a/tests/api/routes/controller/test_users.py +++ b/tests/api/routes/controller/test_users.py @@ -25,9 +25,12 @@ from jose import jwt from sqlalchemy.ext.asyncio import AsyncSession from gns3server.db.repositories.users import UsersRepository +from gns3server.db.repositories.rbac import RbacRepository +from gns3server.schemas.controller.rbac import Permission, HTTPMethods, PermissionAction from gns3server.services import auth_service from gns3server.config import Config from gns3server.schemas.controller.users import User +from gns3server import schemas import gns3server.db.models as models pytestmark = pytest.mark.asyncio @@ -341,3 +344,79 @@ class TestSuperAdmin: # response = await client.get(app.url_path_for("get_user_memberships", user_id=admin_in_db.user_id)) # assert response.status_code == status.HTTP_200_OK # assert len(response.json()) == 1 + +@pytest.fixture +async def test_permission(db_session: AsyncSession) -> Permission: + + new_permission = schemas.PermissionCreate( + methods=[HTTPMethods.get], + path="/statistics", + action=PermissionAction.allow + ) + rbac_repo = RbacRepository(db_session) + existing_permission = await rbac_repo.get_permission_by_path("/statistics") + if existing_permission: + return existing_permission + return await rbac_repo.create_permission(new_permission) + + +class TestUserPermissionsRoutes: + + async def test_add_permission_to_user( + self, + app: FastAPI, + client: AsyncClient, + test_user: User, + test_permission: Permission, + db_session: AsyncSession + ) -> None: + + response = await client.put( + app.url_path_for( + "add_permission_to_user", + user_id=str(test_user.user_id), + permission_id=str(test_permission.permission_id) + ) + ) + assert response.status_code == status.HTTP_204_NO_CONTENT + rbac_repo = RbacRepository(db_session) + permissions = await rbac_repo.get_user_permissions(test_user.user_id) + assert len(permissions) == 1 + assert permissions[0].permission_id == test_permission.permission_id + + async def test_get_user_permissions( + self, + app: FastAPI, + client: AsyncClient, + test_user: User, + db_session: AsyncSession + ) -> None: + + response = await client.get( + app.url_path_for( + "get_user_permissions", + user_id=str(test_user.user_id)) + ) + assert response.status_code == status.HTTP_200_OK + assert len(response.json()) == 1 + + async def test_remove_permission_from_user( + self, + app: FastAPI, + client: AsyncClient, + test_user: User, + test_permission: Permission, + db_session: AsyncSession + ) -> None: + + response = await client.delete( + app.url_path_for( + "remove_permission_from_user", + user_id=str(test_user.user_id), + permission_id=str(test_permission.permission_id) + ), + ) + assert response.status_code == status.HTTP_204_NO_CONTENT + rbac_repo = RbacRepository(db_session) + permissions = await rbac_repo.get_user_permissions(test_user.user_id) + assert len(permissions) == 0 diff --git a/tests/controller/test_rbac.py b/tests/controller/test_rbac.py new file mode 100644 index 00000000..faa4e6df --- /dev/null +++ b/tests/controller/test_rbac.py @@ -0,0 +1,203 @@ +#!/usr/bin/env python +# +# Copyright (C) 2021 GNS3 Technologies Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import pytest + +from fastapi import FastAPI, status +from httpx import AsyncClient + +from sqlalchemy.ext.asyncio import AsyncSession +from gns3server.db.repositories.rbac import RbacRepository +from gns3server.db.models import User + +pytestmark = pytest.mark.asyncio + + +class TestPermissions: + + @pytest.mark.parametrize( + "method, path, result", + ( + ("GET", "/users", False), + ("GET", "/projects", True), + ("GET", "/projects/e451ad73-2519-4f83-87fe-a8e821792d44", False), + ("POST", "/projects", True), + ("GET", "/templates", True), + ("GET", "/templates/62e92cf1-244a-4486-8dae-b95439b54da9", False), + ("POST", "/templates", True), + ("GET", "/computes", True), + ("GET", "/computes/local", True), + ("GET", "/symbols", True), + ("GET", "/symbols/default_symbols", True), + ), + ) + async def test_default_permissions_user_group( + self, + app: FastAPI, + authorized_client: AsyncClient, + test_user: User, + db_session: AsyncSession, + method: str, + path: str, + result: bool + ) -> None: + + rbac_repo = RbacRepository(db_session) + authorized = await rbac_repo.check_user_is_authorized(test_user.user_id, method, path) + assert authorized == result + + +class TestProjectsWithRbac: + + async def test_admin_create_project(self, app: FastAPI, client: AsyncClient): + + params = {"name": "Admin project"} + response = await client.post(app.url_path_for("create_project"), json=params) + assert response.status_code == status.HTTP_201_CREATED + + async def test_user_only_access_own_projects( + self, + app: FastAPI, + authorized_client: AsyncClient, + test_user: User, + db_session: AsyncSession + ) -> None: + + params = {"name": "User project"} + response = await authorized_client.post(app.url_path_for("create_project"), json=params) + assert response.status_code == status.HTTP_201_CREATED + project_id = response.json()["project_id"] + + rbac_repo = RbacRepository(db_session) + permissions_in_db = await rbac_repo.get_user_permissions(test_user.user_id) + assert len(permissions_in_db) == 1 + assert permissions_in_db[0].path == f"/projects/{project_id}/*" + + response = await authorized_client.get(app.url_path_for("get_projects")) + assert response.status_code == status.HTTP_200_OK + projects = response.json() + assert len(projects) == 1 + + async def test_admin_access_all_projects(self, app: FastAPI, client: AsyncClient): + + response = await client.get(app.url_path_for("get_projects")) + assert response.status_code == status.HTTP_200_OK + projects = response.json() + assert len(projects) == 2 + + async def test_admin_user_give_permission_on_project( + self, + app: FastAPI, + client: AsyncClient, + test_user: User + ): + + response = await client.get(app.url_path_for("get_projects")) + assert response.status_code == status.HTTP_200_OK + projects = response.json() + project_id = None + for project in projects: + if project["name"] == "Admin project": + project_id = project["project_id"] + break + + new_permission = { + "methods": ["GET"], + "path": f"/projects/{project_id}", + "action": "ALLOW" + } + response = await client.post(app.url_path_for("create_permission"), json=new_permission) + assert response.status_code == status.HTTP_201_CREATED + permission_id = response.json()["permission_id"] + + response = await client.put( + app.url_path_for( + "add_permission_to_user", + user_id=test_user.user_id, + permission_id=permission_id + ) + ) + assert response.status_code == status.HTTP_204_NO_CONTENT + + async def test_user_access_admin_project( + self, + app: FastAPI, + authorized_client: AsyncClient, + test_user: User, + db_session: AsyncSession + ) -> None: + + response = await authorized_client.get(app.url_path_for("get_projects")) + assert response.status_code == status.HTTP_200_OK + projects = response.json() + assert len(projects) == 2 + + +class TestTemplatesWithRbac: + + async def test_admin_create_template(self, app: FastAPI, client: AsyncClient): + + new_template = {"base_script_file": "vpcs_base_config.txt", + "category": "guest", + "console_auto_start": False, + "console_type": "telnet", + "default_name_format": "PC{0}", + "name": "ADMIN_VPCS_TEMPLATE", + "compute_id": "local", + "symbol": ":/symbols/vpcs_guest.svg", + "template_type": "vpcs"} + + response = await client.post(app.url_path_for("create_template"), json=new_template) + assert response.status_code == status.HTTP_201_CREATED + + async def test_user_only_access_own_templates( + self, app: FastAPI, + authorized_client: AsyncClient, + test_user: User, + db_session: AsyncSession + ) -> None: + + new_template = {"base_script_file": "vpcs_base_config.txt", + "category": "guest", + "console_auto_start": False, + "console_type": "telnet", + "default_name_format": "PC{0}", + "name": "USER_VPCS_TEMPLATE", + "compute_id": "local", + "symbol": ":/symbols/vpcs_guest.svg", + "template_type": "vpcs"} + + response = await authorized_client.post(app.url_path_for("create_template"), json=new_template) + assert response.status_code == status.HTTP_201_CREATED + template_id = response.json()["template_id"] + + rbac_repo = RbacRepository(db_session) + permissions_in_db = await rbac_repo.get_user_permissions(test_user.user_id) + assert len(permissions_in_db) == 1 + assert permissions_in_db[0].path == f"/templates/{template_id}/*" + + response = await authorized_client.get(app.url_path_for("get_templates")) + assert response.status_code == status.HTTP_200_OK + templates = [template for template in response.json() if template["builtin"] is False] + assert len(templates) == 1 + + async def test_admin_access_all_templates(self, app: FastAPI, client: AsyncClient): + + response = await client.get(app.url_path_for("get_templates")) + assert response.status_code == status.HTTP_200_OK + templates = [template for template in response.json() if template["builtin"] is False] + assert len(templates) == 2