mirror of
https://github.com/GNS3/gns3-server.git
synced 2025-02-13 03:33:48 +02:00
Merge remote-tracking branch 'origin/3.0' into gh-pages
This commit is contained in:
commit
7934dc60cf
@ -121,20 +121,6 @@ def compute_statistics() -> dict:
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@router.get("/qemu/binaries")
|
|
||||||
async def get_qemu_binaries(
|
|
||||||
archs: Optional[List[str]] = Body(None, embed=True)
|
|
||||||
) -> List[str]:
|
|
||||||
|
|
||||||
return await Qemu.binary_list(archs)
|
|
||||||
|
|
||||||
|
|
||||||
@router.get("/qemu/img-binaries")
|
|
||||||
async def get_image_binaries() -> List[str]:
|
|
||||||
|
|
||||||
return await Qemu.img_binary_list()
|
|
||||||
|
|
||||||
|
|
||||||
@router.get("/qemu/capabilities")
|
@router.get("/qemu/capabilities")
|
||||||
async def get_qemu_capabilities() -> dict:
|
async def get_qemu_capabilities() -> dict:
|
||||||
capabilities = {"kvm": []}
|
capabilities = {"kvm": []}
|
||||||
|
@ -115,18 +115,50 @@ async def delete_compute(
|
|||||||
return Response(status_code=status.HTTP_204_NO_CONTENT)
|
return Response(status_code=status.HTTP_204_NO_CONTENT)
|
||||||
|
|
||||||
|
|
||||||
@router.get("/{compute_id}/{emulator}/images")
|
@router.get("/{compute_id}/docker/images", response_model=List[schemas.ComputeDockerImage])
|
||||||
async def get_images(compute_id: Union[str, UUID], emulator: str) -> List[str]:
|
async def docker_get_images(compute_id: Union[str, UUID]) -> List[schemas.ComputeDockerImage]:
|
||||||
"""
|
"""
|
||||||
Return the list of images available on a compute for a given emulator type.
|
Get Docker images from a compute.
|
||||||
|
"""
|
||||||
|
|
||||||
|
compute = Controller.instance().get_compute(str(compute_id))
|
||||||
|
result = await compute.forward("GET", "docker", "images")
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
@router.get("/{compute_id}/virtualbox/vms", response_model=List[schemas.ComputeVirtualBoxVM])
|
||||||
|
async def virtualbox_vms(compute_id: Union[str, UUID]) -> List[schemas.ComputeVirtualBoxVM]:
|
||||||
|
"""
|
||||||
|
Get VirtualBox VMs from a compute.
|
||||||
|
"""
|
||||||
|
|
||||||
|
compute = Controller.instance().get_compute(str(compute_id))
|
||||||
|
result = await compute.forward("GET", "virtualbox", "vms")
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
@router.get("/{compute_id}/vmware/vms", response_model=List[schemas.ComputeVMwareVM])
|
||||||
|
async def vmware_vms(compute_id: Union[str, UUID]) -> List[schemas.ComputeVMwareVM]:
|
||||||
|
"""
|
||||||
|
Get VMware VMs from a compute.
|
||||||
|
"""
|
||||||
|
|
||||||
|
compute = Controller.instance().get_compute(str(compute_id))
|
||||||
|
result = await compute.forward("GET", "vmware", "vms")
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
@router.post("/{compute_id}/dynamips/auto_idlepc")
|
||||||
|
async def dynamips_autoidlepc(compute_id: Union[str, UUID], auto_idle_pc: schemas.AutoIdlePC) -> str:
|
||||||
|
"""
|
||||||
|
Find a suitable Idle-PC value for a given IOS image. This may take a few minutes.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
controller = Controller.instance()
|
controller = Controller.instance()
|
||||||
compute = controller.get_compute(str(compute_id))
|
return await controller.autoidlepc(str(compute_id), auto_idle_pc.platform, auto_idle_pc.image, auto_idle_pc.ram)
|
||||||
return await compute.images(emulator)
|
|
||||||
|
|
||||||
|
|
||||||
@router.get("/{compute_id}/{emulator}/{endpoint_path:path}")
|
@router.get("/{compute_id}/{emulator}/{endpoint_path:path}", deprecated=True)
|
||||||
async def forward_get(compute_id: Union[str, UUID], emulator: str, endpoint_path: str) -> dict:
|
async def forward_get(compute_id: Union[str, UUID], emulator: str, endpoint_path: str) -> dict:
|
||||||
"""
|
"""
|
||||||
Forward a GET request to a compute.
|
Forward a GET request to a compute.
|
||||||
@ -138,7 +170,7 @@ async def forward_get(compute_id: Union[str, UUID], emulator: str, endpoint_path
|
|||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
@router.post("/{compute_id}/{emulator}/{endpoint_path:path}")
|
@router.post("/{compute_id}/{emulator}/{endpoint_path:path}", deprecated=True)
|
||||||
async def forward_post(compute_id: Union[str, UUID], emulator: str, endpoint_path: str, compute_data: dict) -> dict:
|
async def forward_post(compute_id: Union[str, UUID], emulator: str, endpoint_path: str, compute_data: dict) -> dict:
|
||||||
"""
|
"""
|
||||||
Forward a POST request to a compute.
|
Forward a POST request to a compute.
|
||||||
@ -149,7 +181,7 @@ async def forward_post(compute_id: Union[str, UUID], emulator: str, endpoint_pat
|
|||||||
return await compute.forward("POST", emulator, endpoint_path, data=compute_data)
|
return await compute.forward("POST", emulator, endpoint_path, data=compute_data)
|
||||||
|
|
||||||
|
|
||||||
@router.put("/{compute_id}/{emulator}/{endpoint_path:path}")
|
@router.put("/{compute_id}/{emulator}/{endpoint_path:path}", deprecated=True)
|
||||||
async def forward_put(compute_id: Union[str, UUID], emulator: str, endpoint_path: str, compute_data: dict) -> dict:
|
async def forward_put(compute_id: Union[str, UUID], emulator: str, endpoint_path: str, compute_data: dict) -> dict:
|
||||||
"""
|
"""
|
||||||
Forward a PUT request to a compute.
|
Forward a PUT request to a compute.
|
||||||
@ -158,13 +190,3 @@ async def forward_put(compute_id: Union[str, UUID], emulator: str, endpoint_path
|
|||||||
|
|
||||||
compute = Controller.instance().get_compute(str(compute_id))
|
compute = Controller.instance().get_compute(str(compute_id))
|
||||||
return await compute.forward("PUT", emulator, endpoint_path, data=compute_data)
|
return await compute.forward("PUT", emulator, endpoint_path, data=compute_data)
|
||||||
|
|
||||||
|
|
||||||
@router.post("/{compute_id}/auto_idlepc")
|
|
||||||
async def autoidlepc(compute_id: Union[str, UUID], auto_idle_pc: schemas.AutoIdlePC) -> str:
|
|
||||||
"""
|
|
||||||
Find a suitable Idle-PC value for a given IOS image. This may take a few minutes.
|
|
||||||
"""
|
|
||||||
|
|
||||||
controller = Controller.instance()
|
|
||||||
return await controller.autoidlepc(str(compute_id), auto_idle_pc.platform, auto_idle_pc.image, auto_idle_pc.ram)
|
|
||||||
|
@ -129,7 +129,7 @@ async def get_image(
|
|||||||
async def delete_image(
|
async def delete_image(
|
||||||
image_path: str,
|
image_path: str,
|
||||||
images_repo: ImagesRepository = Depends(get_repository(ImagesRepository)),
|
images_repo: ImagesRepository = Depends(get_repository(ImagesRepository)),
|
||||||
) -> None:
|
) -> Response:
|
||||||
"""
|
"""
|
||||||
Delete an image.
|
Delete an image.
|
||||||
"""
|
"""
|
||||||
@ -159,6 +159,8 @@ async def delete_image(
|
|||||||
if not success:
|
if not success:
|
||||||
raise ControllerError(f"Image '{image_path}' could not be deleted")
|
raise ControllerError(f"Image '{image_path}' could not be deleted")
|
||||||
|
|
||||||
|
return Response(status_code=status.HTTP_204_NO_CONTENT)
|
||||||
|
|
||||||
|
|
||||||
@router.post("/prune", status_code=status.HTTP_204_NO_CONTENT)
|
@router.post("/prune", status_code=status.HTTP_204_NO_CONTENT)
|
||||||
async def prune_images(
|
async def prune_images(
|
||||||
|
@ -159,30 +159,6 @@ class Qemu(BaseManager):
|
|||||||
|
|
||||||
return qemus
|
return qemus
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
async def img_binary_list():
|
|
||||||
"""
|
|
||||||
Gets QEMU-img binaries list available on the host.
|
|
||||||
|
|
||||||
:returns: Array of dictionary {"path": Qemu-img binary path, "version": version of Qemu-img}
|
|
||||||
"""
|
|
||||||
qemu_imgs = []
|
|
||||||
for path in Qemu.paths_list():
|
|
||||||
try:
|
|
||||||
for f in os.listdir(path):
|
|
||||||
if (
|
|
||||||
(f == "qemu-img" or f == "qemu-img.exe")
|
|
||||||
and os.access(os.path.join(path, f), os.X_OK)
|
|
||||||
and os.path.isfile(os.path.join(path, f))
|
|
||||||
):
|
|
||||||
qemu_path = os.path.join(path, f)
|
|
||||||
version = await Qemu._get_qemu_img_version(qemu_path)
|
|
||||||
qemu_imgs.append({"path": qemu_path, "version": version})
|
|
||||||
except OSError:
|
|
||||||
continue
|
|
||||||
|
|
||||||
return qemu_imgs
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
async def get_qemu_version(qemu_path):
|
async def get_qemu_version(qemu_path):
|
||||||
"""
|
"""
|
||||||
|
@ -95,10 +95,8 @@ class ApplianceToTemplate:
|
|||||||
new_config["options"] = options.strip()
|
new_config["options"] = options.strip()
|
||||||
new_config.update(version.get("images"))
|
new_config.update(version.get("images"))
|
||||||
|
|
||||||
if "path" in appliance_config["qemu"]:
|
if "arch" in appliance_config["qemu"]:
|
||||||
new_config["qemu_path"] = appliance_config["qemu"]["path"]
|
new_config["platform"] = appliance_config["qemu"]["arch"]
|
||||||
else:
|
|
||||||
new_config["qemu_path"] = "qemu-system-{}".format(appliance_config["qemu"]["arch"])
|
|
||||||
|
|
||||||
if "first_port_name" in appliance_config:
|
if "first_port_name" in appliance_config:
|
||||||
new_config["first_port_name"] = appliance_config["first_port_name"]
|
new_config["first_port_name"] = appliance_config["first_port_name"]
|
||||||
|
@ -620,23 +620,6 @@ class Compute:
|
|||||||
raise ControllerError(f"Connection lost to {self._id} during {method} {action}")
|
raise ControllerError(f"Connection lost to {self._id} during {method} {action}")
|
||||||
return res.json
|
return res.json
|
||||||
|
|
||||||
async def images(self, type):
|
|
||||||
"""
|
|
||||||
Return the list of images available for this type on the compute node.
|
|
||||||
"""
|
|
||||||
|
|
||||||
res = await self.http_query("GET", f"/{type}/images", timeout=None)
|
|
||||||
images = res.json
|
|
||||||
|
|
||||||
try:
|
|
||||||
if type in ["qemu", "dynamips", "iou"]:
|
|
||||||
images = sorted(images, key=itemgetter("filename"))
|
|
||||||
else:
|
|
||||||
images = sorted(images, key=itemgetter("image"))
|
|
||||||
except OSError as e:
|
|
||||||
raise ComputeError(f"Cannot list images: {str(e)}")
|
|
||||||
return images
|
|
||||||
|
|
||||||
async def list_files(self, project):
|
async def list_files(self, project):
|
||||||
"""
|
"""
|
||||||
List files in the project on computes
|
List files in the project on computes
|
||||||
|
@ -91,6 +91,8 @@ async def get_computes(app: FastAPI) -> List[dict]:
|
|||||||
def image_filter(change: Change, path: str) -> bool:
|
def image_filter(change: Change, path: str) -> bool:
|
||||||
|
|
||||||
if change == Change.added:
|
if change == Change.added:
|
||||||
|
if path.endswith(".tmp") or path.endswith(".md5sum") or path.startswith("."):
|
||||||
|
return False
|
||||||
header_magic_len = 7
|
header_magic_len = 7
|
||||||
with open(path, "rb") as f:
|
with open(path, "rb") as f:
|
||||||
image_header = f.read(header_magic_len) # read the first 7 bytes of the file
|
image_header = f.read(header_magic_len) # read the first 7 bytes of the file
|
||||||
|
@ -21,7 +21,7 @@ from .version import Version
|
|||||||
|
|
||||||
# Controller schemas
|
# Controller schemas
|
||||||
from .controller.links import LinkCreate, LinkUpdate, Link
|
from .controller.links import LinkCreate, LinkUpdate, Link
|
||||||
from .controller.computes import ComputeCreate, ComputeUpdate, AutoIdlePC, Compute
|
from .controller.computes import ComputeCreate, ComputeUpdate, ComputeVirtualBoxVM, ComputeVMwareVM, ComputeDockerImage, AutoIdlePC, Compute
|
||||||
from .controller.templates import TemplateCreate, TemplateUpdate, TemplateUsage, Template
|
from .controller.templates import TemplateCreate, TemplateUpdate, TemplateUsage, Template
|
||||||
from .controller.images import Image, ImageType
|
from .controller.images import Image, ImageType
|
||||||
from .controller.appliances import ApplianceVersion, Appliance
|
from .controller.appliances import ApplianceVersion, Appliance
|
||||||
|
@ -148,6 +148,31 @@ class Compute(DateTimeModelMixin, ComputeBase):
|
|||||||
orm_mode = True
|
orm_mode = True
|
||||||
|
|
||||||
|
|
||||||
|
class ComputeVirtualBoxVM(BaseModel):
|
||||||
|
"""
|
||||||
|
VirtualBox VM from compute.
|
||||||
|
"""
|
||||||
|
|
||||||
|
vmname: str = Field(..., description="VirtualBox VM name")
|
||||||
|
ram: int = Field(..., description="VirtualBox VM memory")
|
||||||
|
|
||||||
|
|
||||||
|
class ComputeVMwareVM(BaseModel):
|
||||||
|
"""
|
||||||
|
VMware VM from compute.
|
||||||
|
"""
|
||||||
|
|
||||||
|
vmname: str = Field(..., description="VMware VM name")
|
||||||
|
|
||||||
|
|
||||||
|
class ComputeDockerImage(BaseModel):
|
||||||
|
"""
|
||||||
|
Docker image from compute.
|
||||||
|
"""
|
||||||
|
|
||||||
|
image: str = Field(..., description="Docker image name")
|
||||||
|
|
||||||
|
|
||||||
class AutoIdlePC(BaseModel):
|
class AutoIdlePC(BaseModel):
|
||||||
"""
|
"""
|
||||||
Data for auto Idle-PC request.
|
Data for auto Idle-PC request.
|
||||||
|
@ -139,7 +139,7 @@ async def discover_images(image_type: str, skip_image_paths: list = None) -> Lis
|
|||||||
for directory in images_directories(image_type):
|
for directory in images_directories(image_type):
|
||||||
for root, _, filenames in os.walk(os.path.normpath(directory)):
|
for root, _, filenames in os.walk(os.path.normpath(directory)):
|
||||||
for filename in filenames:
|
for filename in filenames:
|
||||||
if filename.endswith(".md5sum") or filename.startswith("."):
|
if filename.endswith(".tmp") or filename.endswith(".md5sum") or filename.startswith("."):
|
||||||
continue
|
continue
|
||||||
path = os.path.join(root, filename)
|
path = os.path.join(root, filename)
|
||||||
if not os.path.isfile(path) or skip_image_paths and path in skip_image_paths or path in files:
|
if not os.path.isfile(path) or skip_image_paths and path in skip_image_paths or path in files:
|
||||||
@ -343,7 +343,8 @@ async def write_image(
|
|||||||
os.chmod(image_path, stat.S_IWRITE | stat.S_IREAD | stat.S_IEXEC)
|
os.chmod(image_path, stat.S_IWRITE | stat.S_IREAD | stat.S_IEXEC)
|
||||||
finally:
|
finally:
|
||||||
try:
|
try:
|
||||||
os.remove(tmp_path)
|
if os.path.exists(tmp_path):
|
||||||
|
os.remove(tmp_path)
|
||||||
except OSError:
|
except OSError:
|
||||||
log.warning(f"Could not remove '{tmp_path}'")
|
log.warning(f"Could not remove '{tmp_path}'")
|
||||||
|
|
||||||
|
@ -351,34 +351,6 @@ async def test_qemu_delete_nio(app: FastAPI, compute_client: AsyncClient, qemu_v
|
|||||||
assert response.status_code == status.HTTP_204_NO_CONTENT
|
assert response.status_code == status.HTTP_204_NO_CONTENT
|
||||||
|
|
||||||
|
|
||||||
async def test_qemu_list_binaries(app: FastAPI, compute_client: AsyncClient) -> None:
|
|
||||||
|
|
||||||
ret = [{"path": "/tmp/1", "version": "2.2.0"},
|
|
||||||
{"path": "/tmp/2", "version": "2.1.0"}]
|
|
||||||
|
|
||||||
with asyncio_patch("gns3server.compute.qemu.Qemu.binary_list", return_value=ret) as mock:
|
|
||||||
response = await compute_client.get(app.url_path_for("compute:get_qemu_binaries"))
|
|
||||||
assert mock.called_with(None)
|
|
||||||
assert response.status_code == status.HTTP_200_OK
|
|
||||||
assert response.json() == ret
|
|
||||||
|
|
||||||
|
|
||||||
# async def test_qemu_list_binaries_filter(app: FastAPI, compute_client: AsyncClient, vm: dict) -> None:
|
|
||||||
#
|
|
||||||
# ret = [
|
|
||||||
# {"path": "/tmp/x86_64", "version": "2.2.0"},
|
|
||||||
# {"path": "/tmp/alpha", "version": "2.1.0"},
|
|
||||||
# {"path": "/tmp/i386", "version": "2.1.0"}
|
|
||||||
# ]
|
|
||||||
#
|
|
||||||
# with asyncio_patch("gns3server.compute.qemu.Qemu.binary_list", return_value=ret) as mock:
|
|
||||||
# response = await compute_client.get(app.url_path_for("compute:get_qemu_binaries"),
|
|
||||||
# json={"archs": ["i386"]})
|
|
||||||
# assert response.status_code == status.HTTP_200_OK
|
|
||||||
# assert mock.called_with(["i386"])
|
|
||||||
# assert response.json() == ret
|
|
||||||
|
|
||||||
|
|
||||||
async def test_images(app: FastAPI, compute_client: AsyncClient, fake_qemu_vm) -> None:
|
async def test_images(app: FastAPI, compute_client: AsyncClient, fake_qemu_vm) -> None:
|
||||||
|
|
||||||
response = await compute_client.get(app.url_path_for("compute:get_qemu_images"))
|
response = await compute_client.get(app.url_path_for("compute:get_qemu_images"))
|
||||||
|
@ -109,7 +109,7 @@ class TestComputeRoutes:
|
|||||||
|
|
||||||
class TestComputeFeatures:
|
class TestComputeFeatures:
|
||||||
|
|
||||||
async def test_compute_list_images(self, app: FastAPI, client: AsyncClient) -> None:
|
async def test_compute_list_docker_images(self, app: FastAPI, client: AsyncClient) -> None:
|
||||||
|
|
||||||
params = {
|
params = {
|
||||||
"protocol": "http",
|
"protocol": "http",
|
||||||
@ -123,12 +123,12 @@ class TestComputeFeatures:
|
|||||||
assert response.status_code == status.HTTP_201_CREATED
|
assert response.status_code == status.HTTP_201_CREATED
|
||||||
compute_id = response.json()["compute_id"]
|
compute_id = response.json()["compute_id"]
|
||||||
|
|
||||||
with asyncio_patch("gns3server.controller.compute.Compute.images", return_value=[{"filename": "linux.qcow2"}, {"filename": "asav.qcow2"}]) as mock:
|
with asyncio_patch("gns3server.controller.compute.Compute.forward", return_value=[{"image": "docker1"}, {"image": "docker2"}]) as mock:
|
||||||
response = await client.get(app.url_path_for("delete_compute", compute_id=compute_id) + "/qemu/images")
|
response = await client.get(app.url_path_for("docker_get_images", compute_id=compute_id))
|
||||||
assert response.json() == [{"filename": "linux.qcow2"}, {"filename": "asav.qcow2"}]
|
mock.assert_called_with("GET", "docker", "images")
|
||||||
mock.assert_called_with("qemu")
|
assert response.json() == [{"image": "docker1"}, {"image": "docker2"}]
|
||||||
|
|
||||||
async def test_compute_list_vms(self, app: FastAPI, client: AsyncClient) -> None:
|
async def test_compute_list_virtualbox_vms(self, app: FastAPI, client: AsyncClient) -> None:
|
||||||
|
|
||||||
params = {
|
params = {
|
||||||
"protocol": "http",
|
"protocol": "http",
|
||||||
@ -142,10 +142,28 @@ class TestComputeFeatures:
|
|||||||
compute_id = response.json()["compute_id"]
|
compute_id = response.json()["compute_id"]
|
||||||
|
|
||||||
with asyncio_patch("gns3server.controller.compute.Compute.forward", return_value=[]) as mock:
|
with asyncio_patch("gns3server.controller.compute.Compute.forward", return_value=[]) as mock:
|
||||||
response = await client.get(app.url_path_for("get_compute", compute_id=compute_id) + "/virtualbox/vms")
|
response = await client.get(app.url_path_for("virtualbox_vms", compute_id=compute_id))
|
||||||
mock.assert_called_with("GET", "virtualbox", "vms")
|
mock.assert_called_with("GET", "virtualbox", "vms")
|
||||||
assert response.json() == []
|
assert response.json() == []
|
||||||
|
|
||||||
|
async def test_compute_list_vmware_vms(self, app: FastAPI, client: AsyncClient) -> None:
|
||||||
|
|
||||||
|
params = {
|
||||||
|
"protocol": "http",
|
||||||
|
"host": "localhost",
|
||||||
|
"port": 4243,
|
||||||
|
"user": "julien",
|
||||||
|
"password": "secure"
|
||||||
|
}
|
||||||
|
response = await client.post(app.url_path_for("get_computes"), json=params)
|
||||||
|
assert response.status_code == status.HTTP_201_CREATED
|
||||||
|
compute_id = response.json()["compute_id"]
|
||||||
|
|
||||||
|
with asyncio_patch("gns3server.controller.compute.Compute.forward", return_value=[]) as mock:
|
||||||
|
response = await client.get(app.url_path_for("vmware_vms", compute_id=compute_id))
|
||||||
|
mock.assert_called_with("GET", "vmware", "vms")
|
||||||
|
assert response.json() == []
|
||||||
|
|
||||||
async def test_compute_create_img(self, app: FastAPI, client: AsyncClient) -> None:
|
async def test_compute_create_img(self, app: FastAPI, client: AsyncClient) -> None:
|
||||||
|
|
||||||
params = {
|
params = {
|
||||||
|
@ -82,29 +82,29 @@ async def test_binary_list(monkeypatch, tmpdir):
|
|||||||
assert {"path": os.path.join(os.environ["PATH"], "hello"), "version": version} not in qemus
|
assert {"path": os.path.join(os.environ["PATH"], "hello"), "version": version} not in qemus
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
# @pytest.mark.asyncio
|
||||||
async def test_img_binary_list(monkeypatch, tmpdir):
|
# async def test_img_binary_list(monkeypatch, tmpdir):
|
||||||
|
#
|
||||||
monkeypatch.setenv("PATH", str(tmpdir))
|
# monkeypatch.setenv("PATH", str(tmpdir))
|
||||||
files_to_create = ["qemu-img", "qemu-io", "qemu-system-x86", "qemu-system-x42", "qemu-kvm", "hello"]
|
# files_to_create = ["qemu-img", "qemu-io", "qemu-system-x86", "qemu-system-x42", "qemu-kvm", "hello"]
|
||||||
|
#
|
||||||
for file_to_create in files_to_create:
|
# for file_to_create in files_to_create:
|
||||||
path = os.path.join(os.environ["PATH"], file_to_create)
|
# path = os.path.join(os.environ["PATH"], file_to_create)
|
||||||
with open(path, "w+") as f:
|
# with open(path, "w+") as f:
|
||||||
f.write("1")
|
# f.write("1")
|
||||||
os.chmod(path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
|
# os.chmod(path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
|
||||||
|
#
|
||||||
with asyncio_patch("gns3server.compute.qemu.subprocess_check_output", return_value="qemu-img version 2.2.0, Copyright (c) 2004-2008 Fabrice Bellard") as mock:
|
# with asyncio_patch("gns3server.compute.qemu.subprocess_check_output", return_value="qemu-img version 2.2.0, Copyright (c) 2004-2008 Fabrice Bellard") as mock:
|
||||||
qemus = await Qemu.img_binary_list()
|
# qemus = await Qemu.img_binary_list()
|
||||||
|
#
|
||||||
version = "2.2.0"
|
# version = "2.2.0"
|
||||||
|
#
|
||||||
assert {"path": os.path.join(os.environ["PATH"], "qemu-img"), "version": version} in qemus
|
# assert {"path": os.path.join(os.environ["PATH"], "qemu-img"), "version": version} in qemus
|
||||||
assert {"path": os.path.join(os.environ["PATH"], "qemu-io"), "version": version} not in qemus
|
# assert {"path": os.path.join(os.environ["PATH"], "qemu-io"), "version": version} not in qemus
|
||||||
assert {"path": os.path.join(os.environ["PATH"], "qemu-system-x86"), "version": version} not in qemus
|
# assert {"path": os.path.join(os.environ["PATH"], "qemu-system-x86"), "version": version} not in qemus
|
||||||
assert {"path": os.path.join(os.environ["PATH"], "qemu-kvm"), "version": version} not in qemus
|
# assert {"path": os.path.join(os.environ["PATH"], "qemu-kvm"), "version": version} not in qemus
|
||||||
assert {"path": os.path.join(os.environ["PATH"], "qemu-system-x42"), "version": version} not in qemus
|
# assert {"path": os.path.join(os.environ["PATH"], "qemu-system-x42"), "version": version} not in qemus
|
||||||
assert {"path": os.path.join(os.environ["PATH"], "hello"), "version": version} not in qemus
|
# assert {"path": os.path.join(os.environ["PATH"], "hello"), "version": version} not in qemus
|
||||||
|
|
||||||
|
|
||||||
def test_get_legacy_vm_workdir():
|
def test_get_legacy_vm_workdir():
|
||||||
|
@ -397,29 +397,6 @@ async def test_forward_post(compute):
|
|||||||
await compute.close()
|
await compute.close()
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_images(compute):
|
|
||||||
"""
|
|
||||||
Will return image on compute
|
|
||||||
"""
|
|
||||||
|
|
||||||
response = MagicMock()
|
|
||||||
response.status = 200
|
|
||||||
response.read = AsyncioMagicMock(return_value=json.dumps([{
|
|
||||||
"filename": "linux.qcow2",
|
|
||||||
"path": "linux.qcow2",
|
|
||||||
"md5sum": "d41d8cd98f00b204e9800998ecf8427e",
|
|
||||||
"filesize": 0}]).encode())
|
|
||||||
with asyncio_patch("aiohttp.ClientSession.request", return_value=response) as mock:
|
|
||||||
images = await compute.images("qemu")
|
|
||||||
mock.assert_called_with("GET", "https://example.com:84/v3/compute/qemu/images", auth=None, data=None, headers={'content-type': 'application/json'}, chunked=None, timeout=None)
|
|
||||||
await compute.close()
|
|
||||||
|
|
||||||
assert images == [
|
|
||||||
{"filename": "linux.qcow2", "path": "linux.qcow2", "md5sum": "d41d8cd98f00b204e9800998ecf8427e", "filesize": 0}
|
|
||||||
]
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_list_files(project, compute):
|
async def test_list_files(project, compute):
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user