Handle creating Qemu disk images and resizing

This commit is contained in:
grossmj 2022-04-07 16:21:47 +08:00
parent 888c773dc0
commit fda2a37b98
20 changed files with 701 additions and 472 deletions

View File

@ -51,6 +51,11 @@ from . import virtualbox_nodes
from . import vmware_nodes
from . import vpcs_nodes
import logging
log = logging.getLogger(__name__)
compute_api = FastAPI(
title="GNS3 compute API",
dependencies=[Depends(compute_authentication)],
@ -63,6 +68,7 @@ compute_api.state.controller_host = None
@compute_api.exception_handler(ComputeError)
async def controller_error_handler(request: Request, exc: ComputeError):
log.error(f"Compute error: {exc}")
return JSONResponse(
status_code=409,
content={"message": str(exc)},
@ -71,6 +77,7 @@ async def controller_error_handler(request: Request, exc: ComputeError):
@compute_api.exception_handler(ComputeTimeoutError)
async def controller_timeout_error_handler(request: Request, exc: ComputeTimeoutError):
log.error(f"Compute timeout error: {exc}")
return JSONResponse(
status_code=408,
content={"message": str(exc)},
@ -79,6 +86,7 @@ async def controller_timeout_error_handler(request: Request, exc: ComputeTimeout
@compute_api.exception_handler(ComputeUnauthorizedError)
async def controller_unauthorized_error_handler(request: Request, exc: ComputeUnauthorizedError):
log.error(f"Compute unauthorized error: {exc}")
return JSONResponse(
status_code=401,
content={"message": str(exc)},
@ -87,6 +95,7 @@ async def controller_unauthorized_error_handler(request: Request, exc: ComputeUn
@compute_api.exception_handler(ComputeForbiddenError)
async def controller_forbidden_error_handler(request: Request, exc: ComputeForbiddenError):
log.error(f"Compute forbidden error: {exc}")
return JSONResponse(
status_code=403,
content={"message": str(exc)},
@ -95,6 +104,7 @@ async def controller_forbidden_error_handler(request: Request, exc: ComputeForbi
@compute_api.exception_handler(ComputeNotFoundError)
async def controller_not_found_error_handler(request: Request, exc: ComputeNotFoundError):
log.error(f"Compute not found error: {exc}")
return JSONResponse(
status_code=404,
content={"message": str(exc)},
@ -103,6 +113,7 @@ async def controller_not_found_error_handler(request: Request, exc: ComputeNotFo
@compute_api.exception_handler(GNS3VMError)
async def controller_error_handler(request: Request, exc: GNS3VMError):
log.error(f"Compute GNS3 VM error: {exc}")
return JSONResponse(
status_code=409,
content={"message": str(exc)},
@ -111,6 +122,7 @@ async def controller_error_handler(request: Request, exc: GNS3VMError):
@compute_api.exception_handler(ImageMissingError)
async def image_missing_error_handler(request: Request, exc: ImageMissingError):
log.error(f"Compute image missing error: {exc}")
return JSONResponse(
status_code=409,
content={"message": str(exc), "image": exc.image, "exception": exc.__class__.__name__},
@ -119,6 +131,7 @@ async def image_missing_error_handler(request: Request, exc: ImageMissingError):
@compute_api.exception_handler(NodeError)
async def node_error_handler(request: Request, exc: NodeError):
log.error(f"Compute node error: {exc}")
return JSONResponse(
status_code=409,
content={"message": str(exc), "exception": exc.__class__.__name__},
@ -127,6 +140,7 @@ async def node_error_handler(request: Request, exc: NodeError):
@compute_api.exception_handler(UbridgeError)
async def ubridge_error_handler(request: Request, exc: UbridgeError):
log.error(f"Compute uBridge error: {exc}")
return JSONResponse(
status_code=409,
content={"message": str(exc), "exception": exc.__class__.__name__},

View File

@ -144,55 +144,6 @@ async def get_qemu_capabilities() -> dict:
return capabilities
@router.post(
"/qemu/img",
status_code=status.HTTP_204_NO_CONTENT,
responses={403: {"model": schemas.ErrorMessage, "description": "Forbidden to create Qemu image"}},
)
async def create_qemu_image(image_data: schemas.QemuImageCreate) -> Response:
"""
Create a Qemu image.
"""
#FIXME: move to controller
raise NotImplementedError()
# Raise error if user try to escape
#if not is_safe_path(image_data.path, project.path):
# raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
await Qemu.instance().create_disk(
image_data.qemu_img,
image_data.path,
jsonable_encoder(image_data, exclude_unset=True, exclude={"qemu_img", "path"})
)
return Response(status_code=status.HTTP_204_NO_CONTENT)
@router.put(
"/qemu/img",
status_code=status.HTTP_204_NO_CONTENT,
responses={403: {"model": schemas.ErrorMessage, "description": "Forbidden to update Qemu image"}},
)
async def update_qemu_image(image_data: schemas.QemuImageUpdate) -> Response:
"""
Update a Qemu image.
"""
#FIXME: move to controller
raise NotImplementedError()
# Raise error if user try to escape
#if not is_safe_path(image_data.path, project.path):
# raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
if image_data.extend:
await Qemu.instance().resize_disk(image_data.qemu_img, image_data.path, image_data.extend)
return Response(status_code=status.HTTP_204_NO_CONTENT)
@router.get("/virtualbox/vms", response_model=List[dict])
async def get_virtualbox_vms() -> List[dict]:

View File

@ -26,10 +26,10 @@ from fastapi.responses import StreamingResponse
from uuid import UUID
from gns3server import schemas
from gns3server.compute.project_manager import ProjectManager
from gns3server.compute.qemu import Qemu
from gns3server.compute.qemu.qemu_vm import QemuVM
responses = {404: {"model": schemas.ErrorMessage, "description": "Could not find project or Qemu node"}}
router = APIRouter(responses=responses)
@ -126,10 +126,39 @@ async def duplicate_qemu_node(
return new_node.asdict()
@router.post("/{node_id}/resize_disk", status_code=status.HTTP_204_NO_CONTENT)
async def resize_qemu_node_disk(node_data: schemas.QemuDiskResize, node: QemuVM = Depends(dep_node)) -> Response:
@router.post(
"/{node_id}/disk_image/{disk_name}",
status_code=status.HTTP_204_NO_CONTENT
)
async def create_qemu_disk_image(
disk_name: str,
disk_data: schemas.QemuDiskImageCreate,
node: QemuVM = Depends(dep_node)
) -> Response:
"""
Create a Qemu disk image.
"""
await node.resize_disk(node_data.drive_name, node_data.extend)
options = jsonable_encoder(disk_data, exclude_unset=True)
await node.create_disk_image(disk_name, options)
return Response(status_code=status.HTTP_204_NO_CONTENT)
@router.put(
"/{node_id}/disk_image/{disk_name}",
status_code=status.HTTP_204_NO_CONTENT
)
async def update_qemu_disk_image(
disk_name: str,
disk_data: schemas.QemuDiskImageUpdate,
node: QemuVM = Depends(dep_node)
) -> Response:
"""
Update a Qemu disk image.
"""
if disk_data.extend:
await node.resize_disk_image(disk_name, disk_data.extend)
return Response(status_code=status.HTTP_204_NO_CONTENT)

View File

@ -32,7 +32,7 @@ from gns3server.controller.node import Node
from gns3server.controller.project import Project
from gns3server.utils import force_unix_path
from gns3server.utils.http_client import HTTPClient
from gns3server.controller.controller_error import ControllerForbiddenError
from gns3server.controller.controller_error import ControllerForbiddenError, ControllerBadRequestError
from gns3server import schemas
import logging
@ -300,6 +300,8 @@ async def auto_idlepc(node: Node = Depends(dep_node)) -> str:
Compute an Idle-PC value for a Dynamips node
"""
if node.node_type != "dynamips":
raise ControllerBadRequestError("Auto Idle-PC is only supported on a Dynamips node")
return await node.dynamips_auto_idlepc()
@ -309,16 +311,40 @@ async def idlepc_proposals(node: Node = Depends(dep_node)) -> List[str]:
Compute a list of potential idle-pc values for a Dynamips node
"""
if node.node_type != "dynamips":
raise ControllerBadRequestError("Idle-PC proposals is only supported on a Dynamips node")
return await node.dynamips_idlepc_proposals()
@router.post("/{node_id}/resize_disk", status_code=status.HTTP_204_NO_CONTENT)
async def resize_disk(resize_data: dict, node: Node = Depends(dep_node)) -> Response:
@router.post("/{node_id}/qemu/disk_image/{disk_name}", status_code=status.HTTP_204_NO_CONTENT)
async def create_disk_image(
disk_name: str,
disk_data: schemas.QemuDiskImageCreate,
node: Node = Depends(dep_node)
) -> Response:
"""
Resize a disk image.
Create a Qemu disk image.
"""
await node.post("/resize_disk", **resize_data)
if node.node_type != "qemu":
raise ControllerBadRequestError("Creating a disk image is only supported on a Qemu node")
await node.post(f"/disk_image/{disk_name}", data=disk_data.dict(exclude_unset=True))
return Response(status_code=status.HTTP_204_NO_CONTENT)
@router.put("/{node_id}/qemu/disk_image/{disk_name}", status_code=status.HTTP_204_NO_CONTENT)
async def update_disk_image(
disk_name: str,
disk_data: schemas.QemuDiskImageUpdate,
node: Node = Depends(dep_node)
) -> Response:
"""
Update a Qemu disk image.
"""
if node.node_type != "qemu":
raise ControllerBadRequestError("Updating a disk image is only supported on a Qemu node")
await node.put(f"/disk_image/{disk_name}", data=disk_data.dict(exclude_unset=True))
return Response(status_code=status.HTTP_204_NO_CONTENT)

View File

@ -75,7 +75,7 @@ async def authenticate(
) -> schemas.Token:
"""
Alternative authentication method using json.
Example: curl http://host:port/v3/users/authenticate -d '{"username": "admin", "password": "admin"} -H "Content-Type: application/json" '
Example: curl http://host:port/v3/users/authenticate -d '{"username": "admin", "password": "admin"}' -H "Content-Type: application/json"
"""
user = await users_repo.authenticate_user(username=user_credentials.username, password=user_credentials.password)

View File

@ -34,6 +34,7 @@ from gns3server.controller.controller_error import (
ControllerTimeoutError,
ControllerForbiddenError,
ControllerUnauthorizedError,
ComputeConflictError
)
from gns3server.api.routes import controller, index
@ -138,6 +139,15 @@ async def controller_bad_request_error_handler(request: Request, exc: Controller
)
@app.exception_handler(ComputeConflictError)
async def compute_conflict_error_handler(request: Request, exc: ComputeConflictError):
log.error(f"Controller received error from compute for request '{exc.url()}': {exc}")
return JSONResponse(
status_code=409,
content={"message": str(exc)},
)
# make sure the content key is "message", not "detail" per default
@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException):

View File

@ -234,69 +234,6 @@ class Qemu(BaseManager):
return os.path.join("qemu", f"vm-{legacy_vm_id}")
async def create_disk(self, qemu_img, path, options):
"""
Create a Qemu disk with qemu-img
:param qemu_img: qemu-img binary path
:param path: Image path
:param options: Disk image creation options
"""
try:
img_format = options.pop("format")
img_size = options.pop("size")
if not os.path.isabs(path):
directory = self.get_images_directory()
os.makedirs(directory, exist_ok=True)
path = os.path.join(directory, os.path.basename(path))
try:
if os.path.exists(path):
raise QemuError(f"Could not create disk image '{path}', file already exists")
except UnicodeEncodeError:
raise QemuError(
"Could not create disk image '{}', "
"path contains characters not supported by filesystem".format(path)
)
command = [qemu_img, "create", "-f", img_format]
for option in sorted(options.keys()):
command.extend(["-o", f"{option}={options[option]}"])
command.append(path)
command.append(f"{img_size}M")
print(command)
process = await asyncio.create_subprocess_exec(*command)
await process.wait()
except (OSError, subprocess.SubprocessError) as e:
raise QemuError(f"Could not create disk image {path}:{e}")
async def resize_disk(self, qemu_img, path, extend):
"""
Resize a Qemu disk with qemu-img
:param qemu_img: qemu-img binary path
:param path: Image path
:param size: size
"""
if not os.path.isabs(path):
directory = self.get_images_directory()
os.makedirs(directory, exist_ok=True)
path = os.path.join(directory, os.path.basename(path))
try:
if not os.path.exists(path):
raise QemuError(f"Qemu disk '{path}' does not exist")
command = [qemu_img, "resize", path, f"+{extend}M"]
process = await asyncio.create_subprocess_exec(*command)
await process.wait()
log.info(f"Qemu disk '{path}' extended by {extend} MB")
except (OSError, subprocess.SubprocessError) as e:
raise QemuError(f"Could not update disk image {path}:{e}")
def _init_config_disk(self):
"""
Initialize the default config disk

View File

@ -280,7 +280,7 @@ class QemuVM(BaseNode):
:param value: New disk value
"""
value = self.manager.get_abs_image_path(value, self.project.path)
value = self.manager.get_abs_image_path(value, self.working_dir)
if not self.linked_clone:
for node in self.manager.nodes:
if node != self and getattr(node, variable) == value:
@ -493,7 +493,7 @@ class QemuVM(BaseNode):
"""
if cdrom_image:
self._cdrom_image = self.manager.get_abs_image_path(cdrom_image, self.project.path)
self._cdrom_image = self.manager.get_abs_image_path(cdrom_image, self.working_dir)
log.info(
'QEMU VM "{name}" [{id}] has set the QEMU cdrom image path to {cdrom_image}'.format(
@ -551,7 +551,7 @@ class QemuVM(BaseNode):
:param bios_image: QEMU bios image path
"""
self._bios_image = self.manager.get_abs_image_path(bios_image, self.project.path)
self._bios_image = self.manager.get_abs_image_path(bios_image, self.working_dir)
log.info(
'QEMU VM "{name}" [{id}] has set the QEMU bios image path to {bios_image}'.format(
name=self._name, id=self._id, bios_image=self._bios_image
@ -923,7 +923,7 @@ class QemuVM(BaseNode):
:param initrd: QEMU initrd path
"""
initrd = self.manager.get_abs_image_path(initrd, self.project.path)
initrd = self.manager.get_abs_image_path(initrd, self.working_dir)
log.info(
'QEMU VM "{name}" [{id}] has set the QEMU initrd path to {initrd}'.format(
@ -957,7 +957,7 @@ class QemuVM(BaseNode):
:param kernel_image: QEMU kernel image path
"""
kernel_image = self.manager.get_abs_image_path(kernel_image, self.project.path)
kernel_image = self.manager.get_abs_image_path(kernel_image, self.working_dir)
log.info(
'QEMU VM "{name}" [{id}] has set the QEMU kernel image path to {kernel_image}'.format(
name=self._name, id=self._id, kernel_image=kernel_image
@ -1599,6 +1599,69 @@ class QemuVM(BaseNode):
)
)
async def create_disk_image(self, disk_name, options):
"""
Create a Qemu disk
:param disk_name: disk name
:param options: disk creation options
"""
try:
qemu_img_path = self._get_qemu_img()
img_format = options.pop("format")
img_size = options.pop("size")
disk_path = os.path.join(self.working_dir, disk_name)
try:
if os.path.exists(disk_path):
raise QemuError(f"Could not create disk image '{disk_name}', file already exists")
except UnicodeEncodeError:
raise QemuError(
f"Could not create disk image '{disk_name}', "
"Disk image name contains characters not supported by the filesystem"
)
command = [qemu_img_path, "create", "-f", img_format]
for option in sorted(options.keys()):
command.extend(["-o", f"{option}={options[option]}"])
command.append(disk_path)
command.append(f"{img_size}M")
retcode = await self._qemu_img_exec(command)
if retcode:
stdout = self.read_qemu_img_stdout()
raise QemuError(f"Could not create '{disk_name}' disk image: qemu-img returned with {retcode}\n{stdout}")
else:
log.info(f"QEMU VM '{self.name}' [{self.id}]: Qemu disk image'{disk_name}' created")
except (OSError, subprocess.SubprocessError) as e:
stdout = self.read_qemu_img_stdout()
raise QemuError(f"Could not create '{disk_name}' disk image: {e}\n{stdout}")
async def resize_disk_image(self, disk_name, extend):
"""
Resize a Qemu disk
:param disk_name: disk name
:param extend: new size
"""
try:
qemu_img_path = self._get_qemu_img()
disk_path = os.path.join(self.working_dir, disk_name)
if not os.path.exists(disk_path):
raise QemuError(f"Qemu disk image '{disk_name}' does not exist")
command = [qemu_img_path, "resize", disk_path, f"+{extend}M"]
retcode = await self._qemu_img_exec(command)
if retcode:
stdout = self.read_qemu_img_stdout()
raise QemuError(f"Could not update '{disk_name}' disk image: qemu-img returned with {retcode}\n{stdout}")
else:
log.info(f"QEMU VM '{self.name}' [{self.id}]: Qemu disk image '{disk_name}' extended by {extend} MB")
except (OSError, subprocess.SubprocessError) as e:
stdout = self.read_qemu_img_stdout()
raise QemuError(f"Could not update '{disk_name}' disk image: {e}\n{stdout}")
@property
def started(self):
"""
@ -1791,6 +1854,7 @@ class QemuVM(BaseNode):
*command, stdout=fd, stderr=subprocess.STDOUT, cwd=self.working_dir
)
retcode = await process.wait()
if retcode != 0:
log.info(f"{self._get_qemu_img()} returned with {retcode}")
return retcode
@ -2406,20 +2470,20 @@ class QemuVM(BaseNode):
answer[field] = getattr(self, field)
except AttributeError:
pass
answer["hda_disk_image"] = self.manager.get_relative_image_path(self._hda_disk_image, self.project.path)
answer["hda_disk_image"] = self.manager.get_relative_image_path(self._hda_disk_image, self.working_dir)
answer["hda_disk_image_md5sum"] = md5sum(self._hda_disk_image)
answer["hdb_disk_image"] = self.manager.get_relative_image_path(self._hdb_disk_image, self.project.path)
answer["hdb_disk_image"] = self.manager.get_relative_image_path(self._hdb_disk_image, self.working_dir)
answer["hdb_disk_image_md5sum"] = md5sum(self._hdb_disk_image)
answer["hdc_disk_image"] = self.manager.get_relative_image_path(self._hdc_disk_image, self.project.path)
answer["hdc_disk_image"] = self.manager.get_relative_image_path(self._hdc_disk_image, self.working_dir)
answer["hdc_disk_image_md5sum"] = md5sum(self._hdc_disk_image)
answer["hdd_disk_image"] = self.manager.get_relative_image_path(self._hdd_disk_image, self.project.path)
answer["hdd_disk_image"] = self.manager.get_relative_image_path(self._hdd_disk_image, self.working_dir)
answer["hdd_disk_image_md5sum"] = md5sum(self._hdd_disk_image)
answer["cdrom_image"] = self.manager.get_relative_image_path(self._cdrom_image, self.project.path)
answer["cdrom_image"] = self.manager.get_relative_image_path(self._cdrom_image, self.working_dir)
answer["cdrom_image_md5sum"] = md5sum(self._cdrom_image)
answer["bios_image"] = self.manager.get_relative_image_path(self._bios_image, self.project.path)
answer["bios_image"] = self.manager.get_relative_image_path(self._bios_image, self.working_dir)
answer["bios_image_md5sum"] = md5sum(self._bios_image)
answer["initrd"] = self.manager.get_relative_image_path(self._initrd, self.project.path)
answer["initrd"] = self.manager.get_relative_image_path(self._initrd, self.working_dir)
answer["initrd_md5sum"] = md5sum(self._initrd)
answer["kernel_image"] = self.manager.get_relative_image_path(self._kernel_image, self.project.path)
answer["kernel_image"] = self.manager.get_relative_image_path(self._kernel_image, self.working_dir)
answer["kernel_image_md5sum"] = md5sum(self._kernel_image)
return answer

View File

@ -30,10 +30,13 @@ from ..utils import parse_version
from ..utils.asyncio import locking
from ..controller.controller_error import (
ControllerError,
ControllerBadRequestError,
ControllerNotFoundError,
ControllerForbiddenError,
ControllerTimeoutError,
ControllerUnauthorizedError,
ComputeError,
ComputeConflictError
)
from ..version import __version__, __version_info__
@ -43,23 +46,6 @@ import logging
log = logging.getLogger(__name__)
class ComputeError(ControllerError):
pass
# FIXME: broken
class ComputeConflict(ComputeError):
"""
Raise when the compute send a 409 that we can handle
:param response: The response of the compute
"""
def __init__(self, response):
super().__init__(response["message"])
self.response = response
class Compute:
"""
A GNS3 compute.
@ -574,7 +560,9 @@ class Compute:
else:
msg = ""
if response.status == 401:
if response.status == 400:
raise ControllerBadRequestError(msg)
elif response.status == 401:
raise ControllerUnauthorizedError(f"Invalid authentication for compute '{self.name}' [{self.id}]")
elif response.status == 403:
raise ControllerForbiddenError(msg)
@ -584,7 +572,7 @@ class Compute:
raise ControllerTimeoutError(f"{method} {path} request timeout")
elif response.status == 409:
try:
raise ComputeConflict(json.loads(body))
raise ComputeConflictError(url, json.loads(body))
# If the 409 doesn't come from a GNS3 server
except ValueError:
raise ControllerError(msg)
@ -593,7 +581,7 @@ class Compute:
elif response.status == 503:
raise aiohttp.web.HTTPServiceUnavailable(text=f"Service unavailable {url} {body}")
else:
raise NotImplementedError(f"{response.status} status code is not supported for {method} '{url}'")
raise NotImplementedError(f"{response.status} status code is not supported for {method} '{url}'\n{body}")
if body and len(body):
if raw:
response.body = body

View File

@ -51,3 +51,24 @@ class ControllerForbiddenError(ControllerError):
class ControllerTimeoutError(ControllerError):
def __init__(self, message: str):
super().__init__(message)
class ComputeError(ControllerError):
pass
class ComputeConflictError(ComputeError):
"""
Raise when the compute sends a 409 that we can handle
:param request URL: compute URL used for the request
:param response: compute JSON response
"""
def __init__(self, url, response):
super().__init__(response["message"])
self._url = url
self._response = response
def url(self):
return self._url

View File

@ -21,8 +21,11 @@ import copy
import uuid
import os
from .compute import ComputeConflict, ComputeError
from .controller_error import ControllerError, ControllerTimeoutError
from .controller_error import (
ControllerError,
ControllerTimeoutError,
ComputeError
)
from .ports.port_factory import PortFactory, StandardPortFactory, DynamipsPortFactory
from ..utils.images import images_directories
from ..config import Config

View File

@ -73,9 +73,12 @@ from .compute.dynamips_nodes import DynamipsCreate, DynamipsUpdate, Dynamips
from .compute.ethernet_hub_nodes import EthernetHubCreate, EthernetHubUpdate, EthernetHub
from .compute.ethernet_switch_nodes import EthernetSwitchCreate, EthernetSwitchUpdate, EthernetSwitch
from .compute.frame_relay_switch_nodes import FrameRelaySwitchCreate, FrameRelaySwitchUpdate, FrameRelaySwitch
from .compute.qemu_nodes import QemuCreate, QemuUpdate, QemuImageCreate, QemuImageUpdate, QemuDiskResize, Qemu
from .compute.qemu_nodes import QemuCreate, QemuUpdate, Qemu
from .compute.iou_nodes import IOUCreate, IOUUpdate, IOUStart, IOU
from .compute.nat_nodes import NATCreate, NATUpdate, NAT
from .compute.vpcs_nodes import VPCSCreate, VPCSUpdate, VPCS
from .compute.vmware_nodes import VMwareCreate, VMwareUpdate, VMware
from .compute.virtualbox_nodes import VirtualBoxCreate, VirtualBoxUpdate, VirtualBox
# Schemas for both controller and compute
from .qemu_disk_image import QemuDiskImageCreate, QemuDiskImageUpdate

View File

@ -15,7 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from pydantic import BaseModel, Field
from typing import Optional, Union
from typing import Optional
from enum import Enum

View File

@ -232,113 +232,7 @@ class Qemu(QemuBase):
status: NodeStatus = Field(..., description="Container status (read only)")
class QemuDriveName(str, Enum):
"""
Supported Qemu drive names.
"""
hda = "hda"
hdb = "hdb"
hdc = "hdc"
hdd = "hdd"
class QemuDiskResize(BaseModel):
"""
Properties to resize a Qemu disk.
"""
drive_name: QemuDriveName = Field(..., description="Qemu drive name")
extend: int = Field(..., description="Number of Megabytes to extend the image")
class QemuBinaryPath(BaseModel):
path: str
version: str
class QemuImageFormat(str, Enum):
"""
Supported Qemu image formats.
"""
qcow2 = "qcow2"
qcow = "qcow"
vpc = "vpc"
vdi = "vdi"
vdmk = "vdmk"
raw = "raw"
class QemuImagePreallocation(str, Enum):
"""
Supported Qemu image preallocation options.
"""
off = "off"
metadata = "metadata"
falloc = "falloc"
full = "full"
class QemuImageOnOff(str, Enum):
"""
Supported Qemu image on/off options.
"""
on = "off"
off = "off"
class QemuImageSubformat(str, Enum):
"""
Supported Qemu image preallocation options.
"""
dynamic = "dynamic"
fixed = "fixed"
stream_optimized = "streamOptimized"
two_gb_max_extent_sparse = "twoGbMaxExtentSparse"
two_gb_max_extent_flat = "twoGbMaxExtentFlat"
monolithic_sparse = "monolithicSparse"
monolithic_flat = "monolithicFlat"
class QemuImageAdapterType(str, Enum):
"""
Supported Qemu image on/off options.
"""
ide = "ide"
lsilogic = "lsilogic"
buslogic = "buslogic"
legacy_esx = "legacyESX"
class QemuImageBase(BaseModel):
qemu_img: str = Field(..., description="Path to the qemu-img binary")
path: str = Field(..., description="Absolute or relative path of the image")
format: QemuImageFormat = Field(..., description="Image format type")
size: int = Field(..., description="Image size in Megabytes")
preallocation: Optional[QemuImagePreallocation]
cluster_size: Optional[int]
refcount_bits: Optional[int]
lazy_refcounts: Optional[QemuImageOnOff]
subformat: Optional[QemuImageSubformat]
static: Optional[QemuImageOnOff]
zeroed_grain: Optional[QemuImageOnOff]
adapter_type: Optional[QemuImageAdapterType]
class QemuImageCreate(QemuImageBase):
pass
class QemuImageUpdate(QemuImageBase):
format: Optional[QemuImageFormat] = Field(None, description="Image format type")
size: Optional[int] = Field(None, description="Image size in Megabytes")
extend: Optional[int] = Field(None, description="Number of Megabytes to extend the image")

View File

@ -0,0 +1,103 @@
#
# Copyright (C) 2022 GNS3 Technologies Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from pydantic import BaseModel, Field
from typing import Optional
from enum import Enum
class QemuDiskImageFormat(str, Enum):
"""
Supported Qemu disk image formats.
"""
qcow2 = "qcow2"
qcow = "qcow"
vpc = "vpc"
vdi = "vdi"
vdmk = "vdmk"
raw = "raw"
class QemuDiskImagePreallocation(str, Enum):
"""
Supported Qemu disk image pre-allocation options.
"""
off = "off"
metadata = "metadata"
falloc = "falloc"
full = "full"
class QemuDiskImageOnOff(str, Enum):
"""
Supported Qemu image on/off options.
"""
on = "on"
off = "off"
class QemuDiskImageSubformat(str, Enum):
"""
Supported Qemu disk image sub-format options.
"""
dynamic = "dynamic"
fixed = "fixed"
stream_optimized = "streamOptimized"
two_gb_max_extent_sparse = "twoGbMaxExtentSparse"
two_gb_max_extent_flat = "twoGbMaxExtentFlat"
monolithic_sparse = "monolithicSparse"
monolithic_flat = "monolithicFlat"
class QemuDiskImageAdapterType(str, Enum):
"""
Supported Qemu disk image on/off options.
"""
ide = "ide"
lsilogic = "lsilogic"
buslogic = "buslogic"
legacy_esx = "legacyESX"
class QemuDiskImageBase(BaseModel):
format: QemuDiskImageFormat = Field(..., description="Image format type")
size: int = Field(..., description="Image size in Megabytes")
preallocation: Optional[QemuDiskImagePreallocation]
cluster_size: Optional[int]
refcount_bits: Optional[int]
lazy_refcounts: Optional[QemuDiskImageOnOff]
subformat: Optional[QemuDiskImageSubformat]
static: Optional[QemuDiskImageOnOff]
zeroed_grain: Optional[QemuDiskImageOnOff]
adapter_type: Optional[QemuDiskImageAdapterType]
class QemuDiskImageCreate(QemuDiskImageBase):
pass
class QemuDiskImageUpdate(QemuDiskImageBase):
format: Optional[QemuDiskImageFormat] = Field(None, description="Image format type")
size: Optional[int] = Field(None, description="Image size in Megabytes")
extend: Optional[int] = Field(None, description="Number of Megabytes to extend the image")

View File

@ -17,13 +17,13 @@
import pytest
import os
import sys
import stat
import shutil
from fastapi import FastAPI, status
from httpx import AsyncClient
from tests.utils import asyncio_patch
from unittest.mock import patch
from unittest.mock import patch, MagicMock
from gns3server.compute.project import Project
@ -52,6 +52,16 @@ def fake_qemu_vm(images_dir) -> str:
return bin_path
@pytest.fixture
def fake_qemu_img_binary(tmpdir):
bin_path = str(tmpdir / "qemu-img")
with open(bin_path, "w+") as f:
f.write("1")
os.chmod(bin_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
return bin_path
@pytest.fixture
def base_params(tmpdir, fake_qemu_bin) -> dict:
"""Return standard parameters"""
@ -60,9 +70,12 @@ def base_params(tmpdir, fake_qemu_bin) -> dict:
@pytest.fixture
async def vm(app: FastAPI, compute_client: AsyncClient, compute_project: Project, base_params: dict) -> None:
async def qemu_vm(app: FastAPI, compute_client: AsyncClient, compute_project: Project, base_params: dict) -> None:
response = await compute_client.post(app.url_path_for("compute:create_qemu_node", project_id=compute_project.id), json=base_params)
response = await compute_client.post(
app.url_path_for("compute:create_qemu_node", project_id=compute_project.id),
json=base_params
)
assert response.status_code == status.HTTP_201_CREATED
return response.json()
@ -116,99 +129,107 @@ async def test_qemu_create_with_params(app: FastAPI,
assert response.json()["hda_disk_image_md5sum"] == "c4ca4238a0b923820dcc509a6f75849b"
async def test_qemu_create_with_project_file(app: FastAPI,
compute_client: AsyncClient,
compute_project: Project,
base_params: dict,
fake_qemu_vm: str) -> None:
response = await compute_client.post(app.url_path_for("compute:write_compute_project_file",
project_id=compute_project.id,
file_path="hello.img"), content=b"world")
assert response.status_code == status.HTTP_204_NO_CONTENT
params = base_params
params["hda_disk_image"] = "hello.img"
response = await compute_client.post(app.url_path_for("compute:create_qemu_node", project_id=compute_project.id), json=params)
assert response.status_code == status.HTTP_201_CREATED
assert response.json()["hda_disk_image"] == "hello.img"
assert response.json()["hda_disk_image_md5sum"] == "7d793037a0760186574b0282f2f435e7"
# async def test_qemu_create_with_project_file(app: FastAPI,
# compute_client: AsyncClient,
# compute_project: Project,
# base_params: dict,
# fake_qemu_vm: str) -> None:
#
# response = await compute_client.post(
# app.url_path_for("compute:write_compute_project_file", project_id=compute_project.id, file_path="hello.img"),
# content=b"world"
# )
# assert response.status_code == status.HTTP_204_NO_CONTENT
# params = base_params
# params["hda_disk_image"] = "hello.img"
# response = await compute_client.post(
# app.url_path_for("compute:create_qemu_node", project_id=compute_project.id),
# json=params
# )
# assert response.status_code == status.HTTP_201_CREATED
# assert response.json()["hda_disk_image"] == "hello.img"
# assert response.json()["hda_disk_image_md5sum"] == "7d793037a0760186574b0282f2f435e7"
async def test_qemu_get(app: FastAPI, compute_client: AsyncClient, compute_project: Project, vm: dict):
async def test_qemu_get(app: FastAPI, compute_client: AsyncClient, compute_project: Project, qemu_vm: dict):
response = await compute_client.get(app.url_path_for("compute:get_qemu_node", project_id=vm["project_id"], node_id=vm["node_id"]))
response = await compute_client.get(
app.url_path_for("compute:get_qemu_node", project_id=qemu_vm["project_id"], node_id=qemu_vm["node_id"])
)
assert response.status_code == status.HTTP_200_OK
assert response.json()["name"] == "PC TEST 1"
assert response.json()["project_id"] == compute_project.id
assert response.json()["node_directory"] == os.path.join(compute_project.path,
assert response.json()["node_directory"] == os.path.join(
compute_project.path,
"project-files",
"qemu",
vm["node_id"])
qemu_vm["node_id"]
)
async def test_qemu_start(app: FastAPI, compute_client: AsyncClient, vm: dict) -> None:
async def test_qemu_start(app: FastAPI, compute_client: AsyncClient, qemu_vm: dict) -> None:
with asyncio_patch("gns3server.compute.qemu.qemu_vm.QemuVM.start", return_value=True) as mock:
response = await compute_client.post(app.url_path_for("compute:start_qemu_node",
project_id=vm["project_id"],
node_id=vm["node_id"]))
response = await compute_client.post(
app.url_path_for("compute:start_qemu_node", project_id=qemu_vm["project_id"], node_id=qemu_vm["node_id"])
)
assert mock.called
assert response.status_code == status.HTTP_204_NO_CONTENT
async def test_qemu_stop(app: FastAPI, compute_client: AsyncClient, vm: dict) -> None:
async def test_qemu_stop(app: FastAPI, compute_client: AsyncClient, qemu_vm: dict) -> None:
with asyncio_patch("gns3server.compute.qemu.qemu_vm.QemuVM.stop", return_value=True) as mock:
response = await compute_client.post(app.url_path_for("compute:stop_qemu_node",
project_id=vm["project_id"],
node_id=vm["node_id"]))
response = await compute_client.post(
app.url_path_for("compute:stop_qemu_node", project_id=qemu_vm["project_id"], node_id=qemu_vm["node_id"])
)
assert mock.called
assert response.status_code == status.HTTP_204_NO_CONTENT
async def test_qemu_reload(app: FastAPI, compute_client: AsyncClient, vm) -> None:
async def test_qemu_reload(app: FastAPI, compute_client: AsyncClient, qemu_vm: dict) -> None:
with asyncio_patch("gns3server.compute.qemu.qemu_vm.QemuVM.reload", return_value=True) as mock:
response = await compute_client.post(app.url_path_for("compute:reload_qemu_node",
project_id=vm["project_id"],
node_id=vm["node_id"]))
response = await compute_client.post(
app.url_path_for("compute:reload_qemu_node", project_id=qemu_vm["project_id"], node_id=qemu_vm["node_id"])
)
assert mock.called
assert response.status_code == status.HTTP_204_NO_CONTENT
async def test_qemu_suspend(app: FastAPI, compute_client: AsyncClient, vm: dict) -> None:
async def test_qemu_suspend(app: FastAPI, compute_client: AsyncClient, qemu_vm: dict) -> None:
with asyncio_patch("gns3server.compute.qemu.qemu_vm.QemuVM.suspend", return_value=True) as mock:
response = await compute_client.post(app.url_path_for("compute:suspend_qemu_node",
project_id=vm["project_id"],
node_id=vm["node_id"]))
response = await compute_client.post(
app.url_path_for("compute:suspend_qemu_node", project_id=qemu_vm["project_id"], node_id=qemu_vm["node_id"])
)
assert mock.called
assert response.status_code == status.HTTP_204_NO_CONTENT
async def test_qemu_resume(app: FastAPI, compute_client: AsyncClient, vm: dict) -> None:
async def test_qemu_resume(app: FastAPI, compute_client: AsyncClient, qemu_vm: dict) -> None:
with asyncio_patch("gns3server.compute.qemu.qemu_vm.QemuVM.resume", return_value=True) as mock:
response = await compute_client.post(app.url_path_for("compute:resume_qemu_node",
project_id=vm["project_id"],
node_id=vm["node_id"]))
response = await compute_client.post(
app.url_path_for("compute:resume_qemu_node", project_id=qemu_vm["project_id"], node_id=qemu_vm["node_id"])
)
assert mock.called
assert response.status_code == status.HTTP_204_NO_CONTENT
async def test_qemu_delete(app: FastAPI, compute_client: AsyncClient, vm: dict) -> None:
async def test_qemu_delete(app: FastAPI, compute_client: AsyncClient, qemu_vm: dict) -> None:
with asyncio_patch("gns3server.compute.qemu.Qemu.delete_node", return_value=True) as mock:
response = await compute_client.delete(app.url_path_for("compute:delete_qemu_node",
project_id=vm["project_id"],
node_id=vm["node_id"]))
response = await compute_client.delete(
app.url_path_for("compute:delete_qemu_node", project_id=qemu_vm["project_id"], node_id=qemu_vm["node_id"])
)
assert mock.called
assert response.status_code == status.HTTP_204_NO_CONTENT
async def test_qemu_update(app: FastAPI,
compute_client: AsyncClient,
vm: dict,
qemu_vm: dict,
free_console_port: int,
fake_qemu_vm: str) -> None:
@ -219,9 +240,10 @@ async def test_qemu_update(app: FastAPI,
"hdb_disk_image": "linux载.img"
}
response = await compute_client.put(app.url_path_for("compute:update_qemu_node",
project_id=vm["project_id"],
node_id=vm["node_id"]), json=params)
response = await compute_client.put(
app.url_path_for("compute:update_qemu_node", project_id=qemu_vm["project_id"], node_id=qemu_vm["node_id"]),
json=params
)
assert response.status_code == status.HTTP_200_OK
assert response.json()["name"] == "test"
assert response.json()["console"] == free_console_port
@ -229,7 +251,7 @@ async def test_qemu_update(app: FastAPI,
assert response.json()["ram"] == 1024
async def test_qemu_nio_create_udp(app: FastAPI, compute_client: AsyncClient, vm: dict) -> None:
async def test_qemu_nio_create_udp(app: FastAPI, compute_client: AsyncClient, qemu_vm: dict) -> None:
params = {
"type": "nio_udp",
@ -239,21 +261,25 @@ async def test_qemu_nio_create_udp(app: FastAPI, compute_client: AsyncClient, vm
}
with asyncio_patch("gns3server.compute.qemu.qemu_vm.QemuVM.add_ubridge_udp_connection"):
await compute_client.put(app.url_path_for("compute:update_qemu_node",
project_id=vm["project_id"],
node_id=vm["node_id"]), json={"adapters": 2})
await compute_client.put(
app.url_path_for("compute:update_qemu_node", project_id=qemu_vm["project_id"], node_id=qemu_vm["node_id"]),
json={"adapters": 2}
)
url = app.url_path_for("compute:create_qemu_node_nio",
project_id=vm["project_id"],
node_id=vm["node_id"],
url = app.url_path_for(
"compute:create_qemu_node_nio",
project_id=qemu_vm["project_id"],
node_id=qemu_vm["node_id"],
adapter_number="1",
port_number="0")
port_number="0"
)
response = await compute_client.post(url, json=params)
assert response.status_code == status.HTTP_201_CREATED
assert response.json()["type"] == "nio_udp"
async def test_qemu_nio_update_udp(app: FastAPI, compute_client: AsyncClient, vm: dict) -> None:
async def test_qemu_nio_update_udp(app: FastAPI, compute_client: AsyncClient, qemu_vm: dict) -> None:
params = {
"type": "nio_udp",
@ -262,31 +288,35 @@ async def test_qemu_nio_update_udp(app: FastAPI, compute_client: AsyncClient, vm
"rhost": "127.0.0.1"
}
await compute_client.put(app.url_path_for("compute:update_qemu_node",
project_id=vm["project_id"],
node_id=vm["node_id"]), json={"adapters": 2})
await compute_client.put(
app.url_path_for("compute:update_qemu_node", project_id=qemu_vm["project_id"], node_id=qemu_vm["node_id"]),
json={"adapters": 2}
)
url = app.url_path_for("compute:create_qemu_node_nio",
project_id=vm["project_id"],
node_id=vm["node_id"],
url = app.url_path_for(
"compute:create_qemu_node_nio",
project_id=qemu_vm["project_id"],
node_id=qemu_vm["node_id"],
adapter_number="1",
port_number="0")
port_number="0"
)
await compute_client.post(url, json=params)
params["filters"] = {}
url = app.url_path_for("compute:update_qemu_node_nio",
project_id=vm["project_id"],
node_id=vm["node_id"],
url = app.url_path_for(
"compute:update_qemu_node_nio",
project_id=qemu_vm["project_id"],
node_id=qemu_vm["node_id"],
adapter_number="1",
port_number="0")
port_number="0"
)
response = await compute_client.put(url, json=params)
assert response.status_code == status.HTTP_201_CREATED
assert response.json()["type"] == "nio_udp"
async def test_qemu_delete_nio(app: FastAPI, compute_client: AsyncClient, vm: dict) -> None:
async def test_qemu_delete_nio(app: FastAPI, compute_client: AsyncClient, qemu_vm: dict) -> None:
params = {
"type": "nio_udp",
@ -296,27 +326,32 @@ async def test_qemu_delete_nio(app: FastAPI, compute_client: AsyncClient, vm: di
}
with asyncio_patch("gns3server.compute.qemu.qemu_vm.QemuVM._ubridge_send"):
await compute_client.put(app.url_path_for("compute:update_qemu_node",
project_id=vm["project_id"],
node_id=vm["node_id"]), json={"adapters": 2})
await compute_client.put(
app.url_path_for("compute:update_qemu_node", project_id=qemu_vm["project_id"], node_id=qemu_vm["node_id"]),
json={"adapters": 2}
)
url = app.url_path_for("compute:create_qemu_node_nio",
project_id=vm["project_id"],
node_id=vm["node_id"],
url = app.url_path_for(
"compute:create_qemu_node_nio",
project_id=qemu_vm["project_id"],
node_id=qemu_vm["node_id"],
adapter_number="1",
port_number="0")
port_number="0"
)
await compute_client.post(url, json=params)
url = app.url_path_for("compute:delete_qemu_node_nio",
project_id=vm["project_id"],
node_id=vm["node_id"],
url = app.url_path_for(
"compute:delete_qemu_node_nio",
project_id=qemu_vm["project_id"],
node_id=qemu_vm["node_id"],
adapter_number="1",
port_number="0")
port_number="0"
)
response = await compute_client.delete(url)
assert response.status_code == status.HTTP_204_NO_CONTENT
async def test_qemu_list_binaries(app: FastAPI, compute_client: AsyncClient, vm: dict) -> None:
async def test_qemu_list_binaries(app: FastAPI, compute_client: AsyncClient) -> None:
ret = [{"path": "/tmp/1", "version": "2.2.0"},
{"path": "/tmp/2", "version": "2.1.0"}]
@ -480,33 +515,177 @@ async def test_capabilities(app: FastAPI, compute_client: AsyncClient) -> None:
async def test_qemu_duplicate(app: FastAPI,
compute_client: AsyncClient,
compute_project: Project,
vm: dict,
qemu_vm: dict,
base_params: dict) -> None:
# create destination node first
response = await compute_client.post(app.url_path_for("compute:create_qemu_node",
project_id=vm["project_id"]), json=base_params)
response = await compute_client.post(
app.url_path_for("compute:create_qemu_node", project_id=qemu_vm["project_id"]),
json=base_params
)
assert response.status_code == status.HTTP_201_CREATED
params = {"destination_node_id": response.json()["node_id"]}
response = await compute_client.post(app.url_path_for("compute:duplicate_qemu_node",
project_id=vm["project_id"], node_id=vm["node_id"]), json=params)
response = await compute_client.post(
app.url_path_for("compute:duplicate_qemu_node", project_id=qemu_vm["project_id"], node_id=qemu_vm["node_id"]),
json=params
)
assert response.status_code == status.HTTP_201_CREATED
async def test_qemu_create_disk_image(
app: FastAPI,
compute_client: AsyncClient,
compute_project: Project,
fake_qemu_img_binary: str,
qemu_vm: dict,
):
options = {
"format": "qcow2",
"preallocation": "metadata",
"cluster_size": 64,
"refcount_bits": 12,
"lazy_refcounts": "off",
"size": 30
}
with asyncio_patch("asyncio.create_subprocess_exec", return_value=MagicMock()) as qemu_img:
response = await compute_client.post(
app.url_path_for(
"compute:create_qemu_disk_image",
project_id=qemu_vm["project_id"],
node_id=qemu_vm["node_id"],
disk_name="disk.qcow2"
),
json=options
)
assert response.status_code == status.HTTP_204_NO_CONTENT
args, kwargs = qemu_img.call_args
assert args == (
fake_qemu_img_binary,
"create",
"-f",
"qcow2",
"-o",
"cluster_size=64",
"-o",
"lazy_refcounts=off",
"-o",
"preallocation=metadata",
"-o",
"refcount_bits=12",
os.path.join(qemu_vm["node_directory"], "disk.qcow2"),
"30M"
)
async def test_qemu_create_disk_image_already_exists(
app: FastAPI,
compute_client: AsyncClient,
compute_project: Project,
fake_qemu_img_binary: str,
qemu_vm: dict,
):
node = compute_project.get_node(qemu_vm["node_id"])
shutil.copy("tests/resources/empty8G.qcow2", os.path.join(node.working_dir, "disk.qcow2"))
options = {
"format": "qcow2",
"size": 100
}
response = await compute_client.post(
app.url_path_for(
"compute:create_qemu_disk_image",
project_id=qemu_vm["project_id"],
node_id=qemu_vm["node_id"],
disk_name="disk.qcow2"
),
json=options
)
assert response.status_code == status.HTTP_409_CONFLICT
# async def test_qemu_create_disk_image_with_not_supported_characters_by_filesystem(
# app: FastAPI,
# compute_client: AsyncClient,
# compute_project: Project,
# fake_qemu_img_binary: str,
# qemu_vm: dict,
# ):
#
# node = compute_project.get_node(qemu_vm["node_id"])
# shutil.copy("tests/resources/empty8G.qcow2", os.path.join(node.working_dir, "disk.qcow2"))
#
# options = {
# "format": "qcow2",
# "size": 100
# }
#
# with patch("os.path.exists", side_effect=UnicodeEncodeError('error', u"", 1, 2, 'Emulated Unicode Err')):
# response = await compute_client.post(
# app.url_path_for(
# "compute:create_qemu_disk_image",
# project_id=qemu_vm["project_id"],
# node_id=qemu_vm["node_id"],
# disk_name=u"\u2019"
# ),
# json=options
# )
# assert response.status_code == status.HTTP_409_CONFLICT
async def test_qemu_update_disk_image(
app: FastAPI,
compute_client: AsyncClient,
compute_project: Project,
fake_qemu_img_binary: str,
qemu_vm: dict,
) -> None:
node = compute_project.get_node(qemu_vm["node_id"])
shutil.copy("tests/resources/empty8G.qcow2", os.path.join(node.working_dir, "disk.qcow2"))
with asyncio_patch("asyncio.create_subprocess_exec", return_value=MagicMock()) as qemu_img:
response = await compute_client.put(
app.url_path_for(
"compute:update_qemu_disk_image",
project_id=qemu_vm["project_id"],
node_id=qemu_vm["node_id"],
disk_name="disk.qcow2"
),
json={"extend": 10}
)
assert response.status_code == status.HTTP_204_NO_CONTENT
assert qemu_img.called
args, kwargs = qemu_img.call_args
assert args == (
fake_qemu_img_binary,
"resize",
os.path.join(qemu_vm["node_directory"], "disk.qcow2"),
"+10M"
)
@pytest.mark.asyncio
async def test_qemu_start_capture(app: FastAPI, compute_client: AsyncClient, vm):
async def test_qemu_start_capture(app: FastAPI, compute_client: AsyncClient, qemu_vm: dict):
params = {
"capture_file_name": "test.pcap",
"data_link_type": "DLT_EN10MB"
}
url = app.url_path_for("compute:start_qemu_node_capture",
project_id=vm["project_id"],
node_id=vm["node_id"],
url = app.url_path_for(
"compute:start_qemu_node_capture",
project_id=qemu_vm["project_id"],
node_id=qemu_vm["node_id"],
adapter_number="0",
port_number="0")
port_number="0"
)
with patch("gns3server.compute.qemu.qemu_vm.QemuVM.is_running", return_value=True):
with asyncio_patch("gns3server.compute.qemu.qemu_vm.QemuVM.start_capture") as mock:
@ -517,13 +696,15 @@ async def test_qemu_start_capture(app: FastAPI, compute_client: AsyncClient, vm)
@pytest.mark.asyncio
async def test_qemu_stop_capture(app: FastAPI, compute_client: AsyncClient, vm):
async def test_qemu_stop_capture(app: FastAPI, compute_client: AsyncClient, qemu_vm: dict):
url = app.url_path_for("compute:stop_qemu_node_capture",
project_id=vm["project_id"],
node_id=vm["node_id"],
url = app.url_path_for(
"compute:stop_qemu_node_capture",
project_id=qemu_vm["project_id"],
node_id=qemu_vm["node_id"],
adapter_number="0",
port_number="0")
port_number="0"
)
with patch("gns3server.compute.qemu.qemu_vm.QemuVM.is_running", return_value=True):
with asyncio_patch("gns3server.compute.qemu.qemu_vm.QemuVM.stop_capture") as mock:

View File

@ -223,33 +223,129 @@ async def test_dynamips_idle_pc(
client: AsyncClient,
project: Project,
compute: Compute,
node: Node) -> None:
node: Node
) -> None:
response = MagicMock()
response.json = {"idlepc": "0x60606f54"}
compute.get = AsyncioMagicMock(return_value=response)
node._node_type = "dynamips" # force Dynamips node type
response = await client.get(app.url_path_for("auto_idlepc", project_id=project.id, node_id=node.id))
assert response.status_code == status.HTTP_200_OK
assert response.json()["idlepc"] == "0x60606f54"
async def test_dynamips_idle_pc_wrong_node_type(
app: FastAPI,
client: AsyncClient,
project: Project,
compute: Compute,
node: Node
) -> None:
response = await client.get(app.url_path_for("auto_idlepc", project_id=project.id, node_id=node.id))
assert response.status_code == status.HTTP_400_BAD_REQUEST
async def test_dynamips_idlepc_proposals(
app: FastAPI,
client: AsyncClient,
project: Project,
compute: Compute,
node: Node) -> None:
node: Node
) -> None:
response = MagicMock()
response.json = ["0x60606f54", "0x33805a22"]
compute.get = AsyncioMagicMock(return_value=response)
node._node_type = "dynamips" # force Dynamips node type
response = await client.get(app.url_path_for("idlepc_proposals", project_id=project.id, node_id=node.id))
assert response.status_code == status.HTTP_200_OK
assert response.json() == ["0x60606f54", "0x33805a22"]
async def test_dynamips_idlepc_proposals_wrong_node_type(
app: FastAPI,
client: AsyncClient,
project: Project,
compute: Compute,
node: Node
) -> None:
response = await client.get(app.url_path_for("idlepc_proposals", project_id=project.id, node_id=node.id))
assert response.status_code == status.HTTP_400_BAD_REQUEST
async def test_qemu_disk_image_create(
app: FastAPI,
client: AsyncClient,
project: Project,
compute: Compute,
node: Node
) -> None:
response = MagicMock()
compute.post = AsyncioMagicMock(return_value=response)
node._node_type = "qemu" # force Qemu node type
response = await client.post(
app.url_path_for("create_disk_image", project_id=project.id, node_id=node.id, disk_name="hda_disk.qcow2"),
json={"format": "qcow2", "size": 30}
)
assert response.status_code == status.HTTP_204_NO_CONTENT
async def test_qemu_disk_image_create_wrong_node_type(
app: FastAPI,
client: AsyncClient,
project: Project,
compute: Compute,
node: Node
) -> None:
response = await client.post(
app.url_path_for("create_disk_image", project_id=project.id, node_id=node.id, disk_name="hda_disk.qcow2"),
json={"format": "qcow2", "size": 30}
)
assert response.status_code == status.HTTP_400_BAD_REQUEST
async def test_qemu_disk_image_update(
app: FastAPI,
client: AsyncClient,
project: Project,
compute: Compute,
node: Node
) -> None:
response = MagicMock()
compute.put = AsyncioMagicMock(return_value=response)
node._node_type = "qemu" # force Qemu node type
response = await client.put(
app.url_path_for("update_disk_image", project_id=project.id, node_id=node.id, disk_name="hda_disk.qcow2"),
json={"extend": 10}
)
assert response.status_code == status.HTTP_204_NO_CONTENT
async def test_qemu_disk_image_update_wrong_node_type(
app: FastAPI,
client: AsyncClient,
project: Project,
compute: Compute,
node: Node
) -> None:
response = await client.put(
app.url_path_for("update_disk_image", project_id=project.id, node_id=node.id, disk_name="hda_disk.qcow2"),
json={"extend": 10}
)
assert response.status_code == status.HTTP_400_BAD_REQUEST
async def test_get_file(app: FastAPI, client: AsyncClient, project: Project, compute: Compute, node: Node) -> None:
response = MagicMock()

View File

@ -17,7 +17,6 @@
import os
import stat
import sys
import pytest
import platform
@ -113,95 +112,6 @@ def test_get_legacy_vm_workdir():
assert Qemu.get_legacy_vm_workdir(42, "bla") == os.path.join("qemu", "vm-42")
@pytest.mark.asyncio
async def test_create_image_abs_path(tmpdir, fake_qemu_img_binary):
options = {
"format": "qcow2",
"preallocation": "metadata",
"cluster_size": 64,
"refcount_bits": 12,
"lazy_refcounts": "off",
"size": 100
}
with asyncio_patch("asyncio.create_subprocess_exec", return_value=MagicMock()) as process:
await Qemu.instance().create_disk(fake_qemu_img_binary, str(tmpdir / "hda.qcow2"), options)
args, kwargs = process.call_args
assert args == (
fake_qemu_img_binary,
"create",
"-f",
"qcow2",
"-o",
"cluster_size=64",
"-o",
"lazy_refcounts=off",
"-o",
"preallocation=metadata",
"-o",
"refcount_bits=12",
str(tmpdir / "hda.qcow2"),
"100M"
)
@pytest.mark.asyncio
async def test_create_image_relative_path(tmpdir, fake_qemu_img_binary):
options = {
"format": "raw",
"size": 100
}
with asyncio_patch("asyncio.create_subprocess_exec", return_value=MagicMock()) as process:
with patch("gns3server.compute.qemu.Qemu.get_images_directory", return_value=str(tmpdir)):
await Qemu.instance().create_disk(fake_qemu_img_binary, "hda.qcow2", options)
args, kwargs = process.call_args
assert args == (
fake_qemu_img_binary,
"create",
"-f",
"raw",
str(tmpdir / "hda.qcow2"),
"100M"
)
@pytest.mark.asyncio
async def test_create_image_exist(tmpdir, fake_qemu_img_binary):
open(str(tmpdir / "hda.qcow2"), "w+").close()
options = {
"format": "raw",
"size": 100
}
with asyncio_patch("asyncio.create_subprocess_exec", return_value=MagicMock()) as process:
with patch("gns3server.compute.qemu.Qemu.get_images_directory", return_value=str(tmpdir)):
with pytest.raises(QemuError):
await Qemu.instance().create_disk(fake_qemu_img_binary, "hda.qcow2", options)
assert not process.called
@pytest.mark.asyncio
async def test_create_image_with_not_supported_characters_by_filesystem(tmpdir, fake_qemu_img_binary):
open(str(tmpdir / "hda.qcow2"), "w+").close()
options = {
"format": "raw",
"size": 100
}
# patching os.makedirs is necessary as it depends on already mocked os.path.exists
with asyncio_patch("asyncio.create_subprocess_exec", return_value=MagicMock()) as process, \
patch("gns3server.compute.qemu.Qemu.get_images_directory", return_value=str(tmpdir)), \
patch("os.path.exists", side_effect=UnicodeEncodeError('error', u"", 1, 2, 'Emulated Unicode Err')),\
patch("os.makedirs"):
with pytest.raises(QemuError):
await Qemu.instance().create_disk(fake_qemu_img_binary, "hda.qcow2", options)
assert not process.called
@pytest.mark.asyncio
async def test_get_kvm_archs_kvm_ok():

View File

@ -18,7 +18,6 @@
import pytest
import asyncio
import os
import sys
import stat
from tests.utils import asyncio_patch, AsyncioMagicMock

View File

@ -20,8 +20,8 @@ import pytest
from unittest.mock import patch, MagicMock
from gns3server.controller.project import Project
from gns3server.controller.compute import Compute, ComputeConflict
from gns3server.controller.controller_error import ControllerError, ControllerNotFoundError
from gns3server.controller.compute import Compute
from gns3server.controller.controller_error import ControllerError, ControllerNotFoundError, ComputeConflictError
from pydantic import SecretStr
from tests.utils import asyncio_patch, AsyncioMagicMock
@ -212,7 +212,7 @@ async def test_compute_httpQueryConflictError(compute):
with asyncio_patch("aiohttp.ClientSession.request", return_value=response) as mock:
response.status = 409
response.read = AsyncioMagicMock(return_value=b'{"message": "Test"}')
with pytest.raises(ComputeConflict):
with pytest.raises(ComputeConflictError):
await compute.post("/projects", {"a": "b"})
assert mock.called
await compute.close()