mirror of
https://github.com/GNS3/gns3-server.git
synced 2025-01-18 07:23:47 +02:00
Merge remote-tracking branch 'origin/master' into 2.0
This commit is contained in:
commit
540ffdf5f3
@ -216,3 +216,8 @@ If you want test coverage:
|
||||
.. code:: bash
|
||||
|
||||
py.test --cov-report term-missing --cov=gns3server
|
||||
|
||||
Security issues
|
||||
----------------
|
||||
Please contact us using contact informations available here:
|
||||
http://docs.gns3.com/1ON9JBXSeR7Nt2-Qum2o3ZX0GU86BZwlmNSUgvmqNWGY/index.html
|
||||
|
230
gns3server/handlers/api/vpcs_handler.py
Normal file
230
gns3server/handlers/api/vpcs_handler.py
Normal file
@ -0,0 +1,230 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
# Copyright (C) 2015 GNS3 Technologies Inc.
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
from aiohttp.web import HTTPConflict
|
||||
from ...web.route import Route
|
||||
from ...schemas.nio import NIO_SCHEMA
|
||||
from ...schemas.vpcs import VPCS_CREATE_SCHEMA
|
||||
from ...schemas.vpcs import VPCS_UPDATE_SCHEMA
|
||||
from ...schemas.vpcs import VPCS_OBJECT_SCHEMA
|
||||
from ...modules.vpcs import VPCS
|
||||
|
||||
|
||||
class VPCSHandler:
|
||||
|
||||
"""
|
||||
API entry points for VPCS.
|
||||
"""
|
||||
|
||||
@classmethod
|
||||
@Route.post(
|
||||
r"/projects/{project_id}/vpcs/vms",
|
||||
parameters={
|
||||
"project_id": "UUID for the project"
|
||||
},
|
||||
status_codes={
|
||||
201: "Instance created",
|
||||
400: "Invalid request",
|
||||
409: "Conflict"
|
||||
},
|
||||
description="Create a new VPCS instance",
|
||||
input=VPCS_CREATE_SCHEMA,
|
||||
output=VPCS_OBJECT_SCHEMA)
|
||||
def create(request, response):
|
||||
|
||||
vpcs = VPCS.instance()
|
||||
vm = yield from vpcs.create_vm(request.json["name"],
|
||||
request.match_info["project_id"],
|
||||
request.json.get("vm_id"),
|
||||
console=request.json.get("console"),
|
||||
startup_script=request.json.get("startup_script"))
|
||||
response.set_status(201)
|
||||
response.json(vm)
|
||||
|
||||
@classmethod
|
||||
@Route.get(
|
||||
r"/projects/{project_id}/vpcs/vms/{vm_id}",
|
||||
parameters={
|
||||
"project_id": "UUID for the project",
|
||||
"vm_id": "UUID for the instance"
|
||||
},
|
||||
status_codes={
|
||||
200: "Success",
|
||||
400: "Invalid request",
|
||||
404: "Instance doesn't exist"
|
||||
},
|
||||
description="Get a VPCS instance",
|
||||
output=VPCS_OBJECT_SCHEMA)
|
||||
def show(request, response):
|
||||
|
||||
vpcs_manager = VPCS.instance()
|
||||
vm = vpcs_manager.get_vm(request.match_info["vm_id"], project_id=request.match_info["project_id"])
|
||||
response.json(vm)
|
||||
|
||||
@classmethod
|
||||
@Route.put(
|
||||
r"/projects/{project_id}/vpcs/vms/{vm_id}",
|
||||
parameters={
|
||||
"project_id": "UUID for the project",
|
||||
"vm_id": "UUID for the instance"
|
||||
},
|
||||
status_codes={
|
||||
200: "Instance updated",
|
||||
400: "Invalid request",
|
||||
404: "Instance doesn't exist",
|
||||
409: "Conflict"
|
||||
},
|
||||
description="Update a VPCS instance",
|
||||
input=VPCS_UPDATE_SCHEMA,
|
||||
output=VPCS_OBJECT_SCHEMA)
|
||||
def update(request, response):
|
||||
|
||||
vpcs_manager = VPCS.instance()
|
||||
vm = vpcs_manager.get_vm(request.match_info["vm_id"], project_id=request.match_info["project_id"])
|
||||
vm.name = request.json.get("name", vm.name)
|
||||
vm.console = request.json.get("console", vm.console)
|
||||
vm.startup_script = request.json.get("startup_script", vm.startup_script)
|
||||
response.json(vm)
|
||||
|
||||
@classmethod
|
||||
@Route.delete(
|
||||
r"/projects/{project_id}/vpcs/vms/{vm_id}",
|
||||
parameters={
|
||||
"project_id": "UUID for the project",
|
||||
"vm_id": "UUID for the instance"
|
||||
},
|
||||
status_codes={
|
||||
204: "Instance deleted",
|
||||
400: "Invalid request",
|
||||
404: "Instance doesn't exist"
|
||||
},
|
||||
description="Delete a VPCS instance")
|
||||
def delete(request, response):
|
||||
|
||||
yield from VPCS.instance().delete_vm(request.match_info["vm_id"])
|
||||
response.set_status(204)
|
||||
|
||||
@classmethod
|
||||
@Route.post(
|
||||
r"/projects/{project_id}/vpcs/vms/{vm_id}/start",
|
||||
parameters={
|
||||
"project_id": "UUID for the project",
|
||||
"vm_id": "UUID for the instance"
|
||||
},
|
||||
status_codes={
|
||||
204: "Instance started",
|
||||
400: "Invalid request",
|
||||
404: "Instance doesn't exist"
|
||||
},
|
||||
description="Start a VPCS instance",
|
||||
output=VPCS_OBJECT_SCHEMA)
|
||||
def start(request, response):
|
||||
|
||||
vpcs_manager = VPCS.instance()
|
||||
vm = vpcs_manager.get_vm(request.match_info["vm_id"], project_id=request.match_info["project_id"])
|
||||
yield from vm.start()
|
||||
response.json(vm)
|
||||
|
||||
@classmethod
|
||||
@Route.post(
|
||||
r"/projects/{project_id}/vpcs/vms/{vm_id}/stop",
|
||||
parameters={
|
||||
"project_id": "UUID for the project",
|
||||
"vm_id": "UUID for the instance"
|
||||
},
|
||||
status_codes={
|
||||
204: "Instance stopped",
|
||||
400: "Invalid request",
|
||||
404: "Instance doesn't exist"
|
||||
},
|
||||
description="Stop a VPCS instance")
|
||||
def stop(request, response):
|
||||
|
||||
vpcs_manager = VPCS.instance()
|
||||
vm = vpcs_manager.get_vm(request.match_info["vm_id"], project_id=request.match_info["project_id"])
|
||||
yield from vm.stop()
|
||||
response.set_status(204)
|
||||
|
||||
@classmethod
|
||||
@Route.post(
|
||||
r"/projects/{project_id}/vpcs/vms/{vm_id}/reload",
|
||||
parameters={
|
||||
"project_id": "UUID for the project",
|
||||
"vm_id": "UUID for the instance",
|
||||
},
|
||||
status_codes={
|
||||
204: "Instance reloaded",
|
||||
400: "Invalid request",
|
||||
404: "Instance doesn't exist"
|
||||
},
|
||||
description="Reload a VPCS instance")
|
||||
def reload(request, response):
|
||||
|
||||
vpcs_manager = VPCS.instance()
|
||||
vm = vpcs_manager.get_vm(request.match_info["vm_id"], project_id=request.match_info["project_id"])
|
||||
yield from vm.reload()
|
||||
response.set_status(204)
|
||||
|
||||
@Route.post(
|
||||
r"/projects/{project_id}/vpcs/vms/{vm_id}/adapters/{adapter_number:\d+}/ports/{port_number:\d+}/nio",
|
||||
parameters={
|
||||
"project_id": "UUID for the project",
|
||||
"vm_id": "UUID for the instance",
|
||||
"adapter_number": "Network adapter where the nio is located",
|
||||
"port_number": "Port where the nio should be added"
|
||||
},
|
||||
status_codes={
|
||||
201: "NIO created",
|
||||
400: "Invalid request",
|
||||
404: "Instance doesn't exist"
|
||||
},
|
||||
description="Add a NIO to a VPCS instance",
|
||||
input=NIO_SCHEMA,
|
||||
output=NIO_SCHEMA)
|
||||
def create_nio(request, response):
|
||||
|
||||
vpcs_manager = VPCS.instance()
|
||||
vm = vpcs_manager.get_vm(request.match_info["vm_id"], project_id=request.match_info["project_id"])
|
||||
nio_type = request.json["type"]
|
||||
if nio_type not in ("nio_udp", "nio_tap"):
|
||||
raise HTTPConflict(text="NIO of type {} is not supported".format(nio_type))
|
||||
nio = vpcs_manager.create_nio(vm.vpcs_path(), request.json)
|
||||
vm.port_add_nio_binding(int(request.match_info["port_number"]), nio)
|
||||
response.set_status(201)
|
||||
response.json(nio)
|
||||
|
||||
@classmethod
|
||||
@Route.delete(
|
||||
r"/projects/{project_id}/vpcs/vms/{vm_id}/adapters/{adapter_number:\d+}/ports/{port_number:\d+}/nio",
|
||||
parameters={
|
||||
"project_id": "UUID for the project",
|
||||
"vm_id": "UUID for the instance",
|
||||
"adapter_number": "Network adapter where the nio is located",
|
||||
"port_number": "Port from where the nio should be removed"
|
||||
},
|
||||
status_codes={
|
||||
204: "NIO deleted",
|
||||
400: "Invalid request",
|
||||
404: "Instance doesn't exist"
|
||||
},
|
||||
description="Remove a NIO from a VPCS instance")
|
||||
def delete_nio(request, response):
|
||||
|
||||
vpcs_manager = VPCS.instance()
|
||||
vm = vpcs_manager.get_vm(request.match_info["vm_id"], project_id=request.match_info["project_id"])
|
||||
vm.port_remove_nio_binding(int(request.match_info["port_number"]))
|
||||
response.set_status(204)
|
493
tests/modules/test_project.py
Normal file
493
tests/modules/test_project.py
Normal file
@ -0,0 +1,493 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
# Copyright (C) 2015 GNS3 Technologies Inc.
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import os
|
||||
import uuid
|
||||
import json
|
||||
import asyncio
|
||||
import pytest
|
||||
import aiohttp
|
||||
import zipfile
|
||||
from uuid import uuid4
|
||||
from unittest.mock import patch
|
||||
|
||||
from tests.utils import asyncio_patch
|
||||
from gns3server.modules.project import Project
|
||||
from gns3server.modules.vpcs import VPCS, VPCSVM
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def manager(port_manager):
|
||||
m = VPCS.instance()
|
||||
m.port_manager = port_manager
|
||||
return m
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def vm(project, manager, loop):
|
||||
vm = manager.create_vm("test", project.id, "00010203-0405-0607-0809-0a0b0c0d0e0f")
|
||||
return loop.run_until_complete(asyncio.async(vm))
|
||||
|
||||
|
||||
def test_affect_uuid():
|
||||
p = Project()
|
||||
assert len(p.id) == 36
|
||||
|
||||
p = Project(project_id='00010203-0405-0607-0809-0a0b0c0d0e0f')
|
||||
assert p.id == '00010203-0405-0607-0809-0a0b0c0d0e0f'
|
||||
|
||||
|
||||
def test_path(tmpdir):
|
||||
with patch("gns3server.modules.project.Project.is_local", return_value=True):
|
||||
p = Project(location=str(tmpdir))
|
||||
assert p.path == os.path.join(str(tmpdir), p.id)
|
||||
assert os.path.exists(os.path.join(str(tmpdir), p.id))
|
||||
assert not os.path.exists(os.path.join(p.path, ".gns3_temporary"))
|
||||
|
||||
|
||||
def test_init_path(tmpdir):
|
||||
|
||||
with patch("gns3server.modules.project.Project.is_local", return_value=True):
|
||||
p = Project(path=str(tmpdir))
|
||||
assert p.path == str(tmpdir)
|
||||
|
||||
|
||||
def test_changing_path_temporary_flag(tmpdir):
|
||||
|
||||
with patch("gns3server.modules.project.Project.is_local", return_value=True):
|
||||
p = Project(temporary=True)
|
||||
assert os.path.exists(p.path)
|
||||
original_path = p.path
|
||||
assert os.path.exists(os.path.join(p.path, ".gns3_temporary"))
|
||||
|
||||
p.path = str(tmpdir)
|
||||
|
||||
|
||||
def test_temporary_path():
|
||||
p = Project(temporary=True)
|
||||
assert os.path.exists(p.path)
|
||||
assert os.path.exists(os.path.join(p.path, ".gns3_temporary"))
|
||||
|
||||
|
||||
def test_remove_temporary_flag():
|
||||
p = Project(temporary=True)
|
||||
assert os.path.exists(p.path)
|
||||
assert os.path.exists(os.path.join(p.path, ".gns3_temporary"))
|
||||
p.temporary = False
|
||||
assert not os.path.exists(os.path.join(p.path, ".gns3_temporary"))
|
||||
|
||||
|
||||
def test_changing_location_not_allowed(tmpdir):
|
||||
with patch("gns3server.modules.project.Project.is_local", return_value=False):
|
||||
with pytest.raises(aiohttp.web.HTTPForbidden):
|
||||
p = Project(location=str(tmpdir))
|
||||
|
||||
|
||||
def test_changing_path_not_allowed(tmpdir):
|
||||
with patch("gns3server.modules.project.Project.is_local", return_value=False):
|
||||
with pytest.raises(aiohttp.web.HTTPForbidden):
|
||||
p = Project()
|
||||
p.path = str(tmpdir)
|
||||
|
||||
|
||||
def test_changing_path_with_quote_not_allowed(tmpdir):
|
||||
with patch("gns3server.modules.project.Project.is_local", return_value=True):
|
||||
with pytest.raises(aiohttp.web.HTTPForbidden):
|
||||
p = Project()
|
||||
p.path = str(tmpdir / "project\"53")
|
||||
|
||||
|
||||
def test_json(tmpdir):
|
||||
p = Project()
|
||||
assert p.__json__() == {"name": p.name, "location": p.location, "path": p.path, "project_id": p.id, "temporary": False}
|
||||
|
||||
|
||||
def test_vm_working_directory(tmpdir, vm):
|
||||
with patch("gns3server.modules.project.Project.is_local", return_value=True):
|
||||
p = Project(location=str(tmpdir))
|
||||
assert p.vm_working_directory(vm) == os.path.join(str(tmpdir), p.id, 'project-files', vm.module_name, vm.id)
|
||||
assert os.path.exists(p.vm_working_directory(vm))
|
||||
|
||||
|
||||
def test_mark_vm_for_destruction(vm):
|
||||
project = Project()
|
||||
project.add_vm(vm)
|
||||
project.mark_vm_for_destruction(vm)
|
||||
assert len(project._vms_to_destroy) == 1
|
||||
assert len(project.vms) == 0
|
||||
|
||||
|
||||
def test_commit(manager, loop):
|
||||
project = Project()
|
||||
vm = VPCSVM("test", "00010203-0405-0607-0809-0a0b0c0d0e0f", project, manager)
|
||||
project.add_vm(vm)
|
||||
directory = project.vm_working_directory(vm)
|
||||
project.mark_vm_for_destruction(vm)
|
||||
assert len(project._vms_to_destroy) == 1
|
||||
assert os.path.exists(directory)
|
||||
loop.run_until_complete(asyncio.async(project.commit()))
|
||||
assert len(project._vms_to_destroy) == 0
|
||||
assert os.path.exists(directory) is False
|
||||
assert len(project.vms) == 0
|
||||
|
||||
|
||||
def test_commit_permission_issue(manager, loop):
|
||||
"""
|
||||
GNS3 will fix the permission and continue to delete
|
||||
"""
|
||||
project = Project()
|
||||
vm = VPCSVM("test", "00010203-0405-0607-0809-0a0b0c0d0e0f", project, manager)
|
||||
project.add_vm(vm)
|
||||
directory = project.vm_working_directory(vm)
|
||||
project.mark_vm_for_destruction(vm)
|
||||
assert len(project._vms_to_destroy) == 1
|
||||
assert os.path.exists(directory)
|
||||
os.chmod(directory, 0)
|
||||
loop.run_until_complete(asyncio.async(project.commit()))
|
||||
|
||||
|
||||
def test_project_delete(loop):
|
||||
project = Project()
|
||||
directory = project.path
|
||||
assert os.path.exists(directory)
|
||||
loop.run_until_complete(asyncio.async(project.delete()))
|
||||
assert os.path.exists(directory) is False
|
||||
|
||||
|
||||
def test_project_delete_permission_issue(loop):
|
||||
project = Project()
|
||||
directory = project.path
|
||||
assert os.path.exists(directory)
|
||||
os.chmod(directory, 0)
|
||||
with pytest.raises(aiohttp.web.HTTPInternalServerError):
|
||||
loop.run_until_complete(asyncio.async(project.delete()))
|
||||
os.chmod(directory, 700)
|
||||
|
||||
|
||||
def test_project_add_vm(manager):
|
||||
project = Project()
|
||||
vm = VPCSVM("test", "00010203-0405-0607-0809-0a0b0c0d0e0f", project, manager)
|
||||
project.add_vm(vm)
|
||||
assert len(project.vms) == 1
|
||||
|
||||
|
||||
def test_project_close(loop, vm, project):
|
||||
|
||||
with asyncio_patch("gns3server.modules.vpcs.vpcs_vm.VPCSVM.close") as mock:
|
||||
loop.run_until_complete(asyncio.async(project.close()))
|
||||
assert mock.called
|
||||
assert vm.id not in vm.manager._vms
|
||||
|
||||
|
||||
def test_project_close_temporary_project(loop, manager):
|
||||
"""A temporary project is deleted when closed"""
|
||||
|
||||
project = Project(temporary=True)
|
||||
directory = project.path
|
||||
assert os.path.exists(directory)
|
||||
loop.run_until_complete(asyncio.async(project.close()))
|
||||
assert os.path.exists(directory) is False
|
||||
|
||||
|
||||
def test_get_default_project_directory(monkeypatch):
|
||||
|
||||
monkeypatch.undo()
|
||||
project = Project()
|
||||
path = os.path.normpath(os.path.expanduser("~/GNS3/projects"))
|
||||
assert project._get_default_project_directory() == path
|
||||
assert os.path.exists(path)
|
||||
|
||||
|
||||
def test_clean_project_directory(tmpdir):
|
||||
|
||||
# A non anonymous project with uuid.
|
||||
project1 = tmpdir / str(uuid4())
|
||||
project1.mkdir()
|
||||
|
||||
# A non anonymous project.
|
||||
oldproject = tmpdir / str(uuid4())
|
||||
oldproject.mkdir()
|
||||
|
||||
# an anonymous project
|
||||
project2 = tmpdir / str(uuid4())
|
||||
project2.mkdir()
|
||||
tmp = (project2 / ".gns3_temporary")
|
||||
with open(str(tmp), 'w+') as f:
|
||||
f.write("1")
|
||||
|
||||
with patch("gns3server.config.Config.get_section_config", return_value={"project_directory": str(tmpdir)}):
|
||||
Project.clean_project_directory()
|
||||
|
||||
assert os.path.exists(str(project1))
|
||||
assert os.path.exists(str(oldproject))
|
||||
assert not os.path.exists(str(project2))
|
||||
|
||||
|
||||
def test_list_files(tmpdir, loop):
|
||||
|
||||
with patch("gns3server.config.Config.get_section_config", return_value={"project_directory": str(tmpdir)}):
|
||||
project = Project()
|
||||
path = project.path
|
||||
os.makedirs(os.path.join(path, "vm-1", "dynamips"))
|
||||
with open(os.path.join(path, "vm-1", "dynamips", "test.bin"), "w+") as f:
|
||||
f.write("test")
|
||||
open(os.path.join(path, "vm-1", "dynamips", "test.ghost"), "w+").close()
|
||||
with open(os.path.join(path, "test.txt"), "w+") as f:
|
||||
f.write("test2")
|
||||
|
||||
files = loop.run_until_complete(asyncio.async(project.list_files()))
|
||||
|
||||
assert files == [
|
||||
{
|
||||
"path": "test.txt",
|
||||
"md5sum": "ad0234829205b9033196ba818f7a872b"
|
||||
},
|
||||
{
|
||||
"path": os.path.join("vm-1", "dynamips", "test.bin"),
|
||||
"md5sum": "098f6bcd4621d373cade4e832627b4f6"
|
||||
}
|
||||
]
|
||||
|
||||
|
||||
def test_export(tmpdir):
|
||||
project = Project()
|
||||
path = project.path
|
||||
os.makedirs(os.path.join(path, "vm-1", "dynamips"))
|
||||
|
||||
# The .gns3 should be renamed project.gns3 in order to simplify import
|
||||
with open(os.path.join(path, "test.gns3"), 'w+') as f:
|
||||
f.write("{}")
|
||||
|
||||
with open(os.path.join(path, "vm-1", "dynamips", "test"), 'w+') as f:
|
||||
f.write("HELLO")
|
||||
with open(os.path.join(path, "vm-1", "dynamips", "test_log.txt"), 'w+') as f:
|
||||
f.write("LOG")
|
||||
os.makedirs(os.path.join(path, "project-files", "snapshots"))
|
||||
with open(os.path.join(path, "project-files", "snapshots", "test"), 'w+') as f:
|
||||
f.write("WORLD")
|
||||
|
||||
z = project.export()
|
||||
|
||||
with open(str(tmpdir / 'zipfile.zip'), 'wb') as f:
|
||||
for data in z:
|
||||
f.write(data)
|
||||
|
||||
with zipfile.ZipFile(str(tmpdir / 'zipfile.zip')) as myzip:
|
||||
with myzip.open("vm-1/dynamips/test") as myfile:
|
||||
content = myfile.read()
|
||||
assert content == b"HELLO"
|
||||
|
||||
assert 'test.gns3' not in myzip.namelist()
|
||||
assert 'project.gns3' in myzip.namelist()
|
||||
assert 'project-files/snapshots/test' not in myzip.namelist()
|
||||
assert 'vm-1/dynamips/test_log.txt' not in myzip.namelist()
|
||||
|
||||
|
||||
def test_export_fix_path(tmpdir):
|
||||
"""
|
||||
Fix absolute image path
|
||||
"""
|
||||
project = Project()
|
||||
path = project.path
|
||||
|
||||
topology = {
|
||||
"topology": {
|
||||
"nodes": [
|
||||
{
|
||||
"properties": {
|
||||
"image": "/tmp/c3725-adventerprisek9-mz.124-25d.image"
|
||||
},
|
||||
"type": "C3725"
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
with open(os.path.join(path, "test.gns3"), 'w+') as f:
|
||||
json.dump(topology, f)
|
||||
|
||||
z = project.export()
|
||||
with open(str(tmpdir / 'zipfile.zip'), 'wb') as f:
|
||||
for data in z:
|
||||
f.write(data)
|
||||
|
||||
with zipfile.ZipFile(str(tmpdir / 'zipfile.zip')) as myzip:
|
||||
with myzip.open("project.gns3") as myfile:
|
||||
content = myfile.read().decode()
|
||||
topology = json.loads(content)
|
||||
assert topology["topology"]["nodes"][0]["properties"]["image"] == "c3725-adventerprisek9-mz.124-25d.image"
|
||||
|
||||
|
||||
def test_export_with_images(tmpdir):
|
||||
"""
|
||||
Fix absolute image path
|
||||
"""
|
||||
project = Project()
|
||||
path = project.path
|
||||
|
||||
os.makedirs(str(tmpdir / "IOS"))
|
||||
with open(str(tmpdir / "IOS" / "test.image"), "w+") as f:
|
||||
f.write("AAA")
|
||||
|
||||
topology = {
|
||||
"topology": {
|
||||
"nodes": [
|
||||
{
|
||||
"properties": {
|
||||
"image": "test.image"
|
||||
},
|
||||
"type": "C3725"
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
with open(os.path.join(path, "test.gns3"), 'w+') as f:
|
||||
json.dump(topology, f)
|
||||
|
||||
with patch("gns3server.modules.Dynamips.get_images_directory", return_value=str(tmpdir / "IOS"),):
|
||||
z = project.export(include_images=True)
|
||||
with open(str(tmpdir / 'zipfile.zip'), 'wb') as f:
|
||||
for data in z:
|
||||
f.write(data)
|
||||
|
||||
with zipfile.ZipFile(str(tmpdir / 'zipfile.zip')) as myzip:
|
||||
myzip.getinfo("images/IOS/test.image")
|
||||
|
||||
|
||||
def test_export_with_vm(tmpdir):
|
||||
project = Project()
|
||||
path = project.path
|
||||
os.makedirs(os.path.join(path, "vm-1", "dynamips"))
|
||||
|
||||
# The .gns3 should be renamed project.gns3 in order to simplify import
|
||||
with open(os.path.join(path, "test.gns3"), 'w+') as f:
|
||||
f.write("{}")
|
||||
|
||||
with open(os.path.join(path, "vm-1", "dynamips", "test"), 'w+') as f:
|
||||
f.write("HELLO")
|
||||
with open(os.path.join(path, "vm-1", "dynamips", "test_log.txt"), 'w+') as f:
|
||||
f.write("LOG")
|
||||
os.makedirs(os.path.join(path, "project-files", "snapshots"))
|
||||
with open(os.path.join(path, "project-files", "snapshots", "test"), 'w+') as f:
|
||||
f.write("WORLD")
|
||||
|
||||
os.makedirs(os.path.join(path, "servers", "vm", "project-files", "docker"))
|
||||
with open(os.path.join(path, "servers", "vm", "project-files", "docker", "busybox"), 'w+') as f:
|
||||
f.write("DOCKER")
|
||||
|
||||
z = project.export()
|
||||
|
||||
with open(str(tmpdir / 'zipfile.zip'), 'wb') as f:
|
||||
for data in z:
|
||||
f.write(data)
|
||||
|
||||
with zipfile.ZipFile(str(tmpdir / 'zipfile.zip')) as myzip:
|
||||
with myzip.open("vm-1/dynamips/test") as myfile:
|
||||
content = myfile.read()
|
||||
assert content == b"HELLO"
|
||||
|
||||
assert 'test.gns3' not in myzip.namelist()
|
||||
assert 'project.gns3' in myzip.namelist()
|
||||
assert 'project-files/snapshots/test' not in myzip.namelist()
|
||||
assert 'vm-1/dynamips/test_log.txt' not in myzip.namelist()
|
||||
assert 'servers/vm/project-files/docker/busybox' not in myzip.namelist()
|
||||
assert 'project-files/docker/busybox' in myzip.namelist()
|
||||
|
||||
|
||||
def test_import(tmpdir):
|
||||
|
||||
project_id = str(uuid.uuid4())
|
||||
project = Project(name="test", project_id=project_id)
|
||||
|
||||
topology = {
|
||||
"project_id": str(uuid.uuid4()),
|
||||
"name": "testtest",
|
||||
"topology": {
|
||||
"nodes": [
|
||||
{
|
||||
"server_id": 3,
|
||||
"type": "VPCSDevice"
|
||||
},
|
||||
{
|
||||
"server_id": 3,
|
||||
"type": "QemuVM"
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
with open(str(tmpdir / "project.gns3"), 'w+') as f:
|
||||
json.dump(topology, f)
|
||||
with open(str(tmpdir / "b.png"), 'w+') as f:
|
||||
f.write("B")
|
||||
|
||||
zip_path = str(tmpdir / "project.zip")
|
||||
with zipfile.ZipFile(zip_path, 'w') as myzip:
|
||||
myzip.write(str(tmpdir / "project.gns3"), "project.gns3")
|
||||
myzip.write(str(tmpdir / "b.png"), "b.png")
|
||||
myzip.write(str(tmpdir / "b.png"), "project-files/dynamips/test")
|
||||
myzip.write(str(tmpdir / "b.png"), "project-files/qemu/test")
|
||||
|
||||
with open(zip_path, "rb") as f:
|
||||
project.import_zip(f)
|
||||
|
||||
assert os.path.exists(os.path.join(project.path, "b.png"))
|
||||
assert os.path.exists(os.path.join(project.path, "test.gns3"))
|
||||
assert os.path.exists(os.path.join(project.path, "project-files/dynamips/test"))
|
||||
assert os.path.exists(os.path.join(project.path, "servers/vm/project-files/qemu/test"))
|
||||
|
||||
with open(os.path.join(project.path, "test.gns3")) as f:
|
||||
content = json.load(f)
|
||||
|
||||
assert content["name"] == "test"
|
||||
assert content["project_id"] == project_id
|
||||
assert content["topology"]["servers"] == [
|
||||
{
|
||||
"id": 1,
|
||||
"local": True,
|
||||
"vm": False
|
||||
},
|
||||
{
|
||||
"id": 2,
|
||||
"local": False,
|
||||
"vm": True
|
||||
},
|
||||
]
|
||||
assert content["topology"]["nodes"][0]["server_id"] == 1
|
||||
assert content["topology"]["nodes"][1]["server_id"] == 2
|
||||
|
||||
|
||||
def test_import_with_images(tmpdir):
|
||||
|
||||
project_id = str(uuid.uuid4())
|
||||
project = Project(name="test", project_id=project_id)
|
||||
|
||||
with open(str(tmpdir / "test.image"), 'w+') as f:
|
||||
f.write("B")
|
||||
|
||||
zip_path = str(tmpdir / "project.zip")
|
||||
with zipfile.ZipFile(zip_path, 'w') as myzip:
|
||||
myzip.write(str(tmpdir / "test.image"), "images/IOS/test.image")
|
||||
|
||||
with open(zip_path, "rb") as f:
|
||||
project.import_zip(f)
|
||||
|
||||
# TEST import images
|
||||
path = os.path.join(project._config().get("images_path"), "IOS", "test.image")
|
||||
assert os.path.exists(path), path
|
284
tests/modules/vpcs/test_vpcs_vm.py
Normal file
284
tests/modules/vpcs/test_vpcs_vm.py
Normal file
@ -0,0 +1,284 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
# Copyright (C) 2015 GNS3 Technologies Inc.
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import pytest
|
||||
import aiohttp
|
||||
import asyncio
|
||||
import os
|
||||
import sys
|
||||
|
||||
from tests.utils import asyncio_patch
|
||||
from gns3server.utils import parse_version
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
from gns3server.modules.vpcs.vpcs_vm import VPCSVM
|
||||
from gns3server.modules.vpcs.vpcs_error import VPCSError
|
||||
from gns3server.modules.vpcs import VPCS
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def manager(port_manager):
|
||||
m = VPCS.instance()
|
||||
m.port_manager = port_manager
|
||||
return m
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def vm(project, manager):
|
||||
vm = VPCSVM("test", "00010203-0405-0607-0809-0a0b0c0d0e0f", project, manager)
|
||||
vm._vpcs_version = parse_version("0.9")
|
||||
return vm
|
||||
|
||||
|
||||
def test_vm(project, manager):
|
||||
vm = VPCSVM("test", "00010203-0405-0607-0809-0a0b0c0d0e0f", project, manager)
|
||||
assert vm.name == "test"
|
||||
assert vm.id == "00010203-0405-0607-0809-0a0b0c0d0e0f"
|
||||
|
||||
|
||||
def test_vm_check_vpcs_version(loop, vm, manager):
|
||||
with asyncio_patch("gns3server.modules.vpcs.vpcs_vm.subprocess_check_output", return_value="Welcome to Virtual PC Simulator, version 0.9"):
|
||||
loop.run_until_complete(asyncio.async(vm._check_vpcs_version()))
|
||||
assert vm._vpcs_version == parse_version("0.9")
|
||||
|
||||
|
||||
def test_vm_check_vpcs_version_0_6_1(loop, vm, manager):
|
||||
with asyncio_patch("gns3server.modules.vpcs.vpcs_vm.subprocess_check_output", return_value="Welcome to Virtual PC Simulator, version 0.6.1"):
|
||||
loop.run_until_complete(asyncio.async(vm._check_vpcs_version()))
|
||||
assert vm._vpcs_version == parse_version("0.6.1")
|
||||
|
||||
|
||||
def test_vm_invalid_vpcs_version(loop, manager, vm):
|
||||
with asyncio_patch("gns3server.modules.vpcs.vpcs_vm.subprocess_check_output", return_value="Welcome to Virtual PC Simulator, version 0.1"):
|
||||
with pytest.raises(VPCSError):
|
||||
nio = manager.create_nio(vm.vpcs_path(), {"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1"})
|
||||
vm.port_add_nio_binding(0, nio)
|
||||
loop.run_until_complete(asyncio.async(vm._check_vpcs_version()))
|
||||
assert vm.name == "test"
|
||||
assert vm.id == "00010203-0405-0607-0809-0a0b0c0d0e0f"
|
||||
|
||||
|
||||
def test_vm_invalid_vpcs_path(vm, manager, loop):
|
||||
with patch("gns3server.modules.vpcs.vpcs_vm.VPCSVM.vpcs_path", return_value="/tmp/fake/path/vpcs"):
|
||||
with pytest.raises(VPCSError):
|
||||
nio = manager.create_nio(vm.vpcs_path(), {"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1"})
|
||||
vm.port_add_nio_binding(0, nio)
|
||||
loop.run_until_complete(asyncio.async(vm.start()))
|
||||
assert vm.name == "test"
|
||||
assert vm.id == "00010203-0405-0607-0809-0a0b0c0d0e0e"
|
||||
|
||||
|
||||
def test_start(loop, vm):
|
||||
process = MagicMock()
|
||||
process.returncode = None
|
||||
queue = vm.project.get_listen_queue()
|
||||
|
||||
with asyncio_patch("gns3server.modules.vpcs.vpcs_vm.VPCSVM._check_requirements", return_value=True):
|
||||
with asyncio_patch("asyncio.create_subprocess_exec", return_value=process) as mock_exec:
|
||||
nio = VPCS.instance().create_nio(vm.vpcs_path(), {"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1"})
|
||||
vm.port_add_nio_binding(0, nio)
|
||||
loop.run_until_complete(asyncio.async(vm.start()))
|
||||
assert mock_exec.call_args[0] == (vm.vpcs_path(),
|
||||
'-p',
|
||||
str(vm.console),
|
||||
'-m', '1',
|
||||
'-i',
|
||||
'1',
|
||||
'-F',
|
||||
'-R',
|
||||
'-s',
|
||||
'4242',
|
||||
'-c',
|
||||
'4243',
|
||||
'-t',
|
||||
'127.0.0.1')
|
||||
assert vm.is_running()
|
||||
assert vm.command_line == ' '.join(mock_exec.call_args[0])
|
||||
(action, event) = queue.get_nowait()
|
||||
assert action == "vm.started"
|
||||
assert event == vm
|
||||
|
||||
|
||||
def test_start_0_6_1(loop, vm):
|
||||
"""
|
||||
Version 0.6.1 doesn't have the -R options. It's not require
|
||||
because GNS3 provide a patch for this.
|
||||
"""
|
||||
process = MagicMock()
|
||||
process.returncode = None
|
||||
queue = vm.project.get_listen_queue()
|
||||
vm._vpcs_version = parse_version("0.6.1")
|
||||
|
||||
with asyncio_patch("gns3server.modules.vpcs.vpcs_vm.VPCSVM._check_requirements", return_value=True):
|
||||
with asyncio_patch("asyncio.create_subprocess_exec", return_value=process) as mock_exec:
|
||||
nio = VPCS.instance().create_nio(vm.vpcs_path(), {"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1"})
|
||||
vm.port_add_nio_binding(0, nio)
|
||||
loop.run_until_complete(asyncio.async(vm.start()))
|
||||
assert mock_exec.call_args[0] == (vm.vpcs_path(),
|
||||
'-p',
|
||||
str(vm.console),
|
||||
'-m', '1',
|
||||
'-i',
|
||||
'1',
|
||||
'-F',
|
||||
'-s',
|
||||
'4242',
|
||||
'-c',
|
||||
'4243',
|
||||
'-t',
|
||||
'127.0.0.1')
|
||||
assert vm.is_running()
|
||||
(action, event) = queue.get_nowait()
|
||||
assert action == "vm.started"
|
||||
assert event == vm
|
||||
|
||||
|
||||
def test_stop(loop, vm):
|
||||
process = MagicMock()
|
||||
|
||||
# Wait process kill success
|
||||
future = asyncio.Future()
|
||||
future.set_result(True)
|
||||
process.wait.return_value = future
|
||||
process.returncode = None
|
||||
|
||||
with asyncio_patch("gns3server.modules.vpcs.vpcs_vm.VPCSVM._check_requirements", return_value=True):
|
||||
with asyncio_patch("asyncio.create_subprocess_exec", return_value=process):
|
||||
nio = VPCS.instance().create_nio(vm.vpcs_path(), {"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1"})
|
||||
vm.port_add_nio_binding(0, nio)
|
||||
|
||||
loop.run_until_complete(asyncio.async(vm.start()))
|
||||
assert vm.is_running()
|
||||
|
||||
queue = vm.project.get_listen_queue()
|
||||
|
||||
with asyncio_patch("gns3server.utils.asyncio.wait_for_process_termination"):
|
||||
loop.run_until_complete(asyncio.async(vm.stop()))
|
||||
assert vm.is_running() is False
|
||||
|
||||
if sys.platform.startswith("win"):
|
||||
process.send_signal.assert_called_with(1)
|
||||
else:
|
||||
process.terminate.assert_called_with()
|
||||
|
||||
(action, event) = queue.get_nowait()
|
||||
assert action == "vm.stopped"
|
||||
assert event == vm
|
||||
|
||||
|
||||
def test_reload(loop, vm):
|
||||
process = MagicMock()
|
||||
|
||||
# Wait process kill success
|
||||
future = asyncio.Future()
|
||||
future.set_result(True)
|
||||
process.wait.return_value = future
|
||||
process.returncode = None
|
||||
|
||||
with asyncio_patch("gns3server.modules.vpcs.vpcs_vm.VPCSVM._check_requirements", return_value=True):
|
||||
with asyncio_patch("asyncio.create_subprocess_exec", return_value=process):
|
||||
nio = VPCS.instance().create_nio(vm.vpcs_path(), {"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1"})
|
||||
vm.port_add_nio_binding(0, nio)
|
||||
loop.run_until_complete(asyncio.async(vm.start()))
|
||||
assert vm.is_running()
|
||||
|
||||
with asyncio_patch("gns3server.utils.asyncio.wait_for_process_termination"):
|
||||
loop.run_until_complete(asyncio.async(vm.reload()))
|
||||
assert vm.is_running() is True
|
||||
|
||||
if sys.platform.startswith("win"):
|
||||
process.send_signal.assert_called_with(1)
|
||||
else:
|
||||
process.terminate.assert_called_with()
|
||||
|
||||
|
||||
def test_add_nio_binding_udp(vm):
|
||||
nio = VPCS.instance().create_nio(vm.vpcs_path(), {"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1"})
|
||||
vm.port_add_nio_binding(0, nio)
|
||||
assert nio.lport == 4242
|
||||
|
||||
|
||||
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Not supported on Windows")
|
||||
def test_add_nio_binding_tap(vm, ethernet_device):
|
||||
with patch("gns3server.modules.base_manager.BaseManager.has_privileged_access", return_value=True):
|
||||
nio = VPCS.instance().create_nio(vm.vpcs_path(), {"type": "nio_tap", "tap_device": ethernet_device})
|
||||
vm.port_add_nio_binding(0, nio)
|
||||
assert nio.tap_device == ethernet_device
|
||||
|
||||
|
||||
def test_port_remove_nio_binding(vm):
|
||||
nio = VPCS.instance().create_nio(vm.vpcs_path(), {"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1"})
|
||||
vm.port_add_nio_binding(0, nio)
|
||||
vm.port_remove_nio_binding(0)
|
||||
assert vm._ethernet_adapter.ports[0] is None
|
||||
|
||||
|
||||
def test_update_startup_script(vm):
|
||||
content = "echo GNS3 VPCS\nip 192.168.1.2\n"
|
||||
vm.startup_script = content
|
||||
filepath = os.path.join(vm.working_dir, 'startup.vpc')
|
||||
assert os.path.exists(filepath)
|
||||
with open(filepath) as f:
|
||||
assert f.read() == content
|
||||
|
||||
|
||||
def test_update_startup_script_h(vm):
|
||||
content = "setname %h\n"
|
||||
vm.name = "pc1"
|
||||
vm.startup_script = content
|
||||
assert os.path.exists(vm.script_file)
|
||||
with open(vm.script_file) as f:
|
||||
assert f.read() == "setname pc1\n"
|
||||
|
||||
|
||||
def test_get_startup_script(vm):
|
||||
content = "echo GNS3 VPCS\nip 192.168.1.2"
|
||||
vm.startup_script = content
|
||||
assert vm.startup_script == os.linesep.join(["echo GNS3 VPCS", "ip 192.168.1.2"])
|
||||
|
||||
|
||||
def test_get_startup_script_using_default_script(vm):
|
||||
content = "echo GNS3 VPCS\nip 192.168.1.2\n"
|
||||
|
||||
# Reset script file location
|
||||
vm._script_file = None
|
||||
|
||||
filepath = os.path.join(vm.working_dir, 'startup.vpc')
|
||||
with open(filepath, 'wb+') as f:
|
||||
assert f.write(content.encode("utf-8"))
|
||||
|
||||
assert vm.startup_script == content
|
||||
assert vm.script_file == filepath
|
||||
|
||||
|
||||
def test_change_name(vm, tmpdir):
|
||||
path = os.path.join(vm.working_dir, 'startup.vpc')
|
||||
vm.name = "world"
|
||||
with open(path, 'w+') as f:
|
||||
f.write("name world")
|
||||
vm.name = "hello"
|
||||
assert vm.name == "hello"
|
||||
with open(path) as f:
|
||||
assert f.read() == "name hello"
|
||||
|
||||
|
||||
def test_close(vm, port_manager, loop):
|
||||
with asyncio_patch("gns3server.modules.vpcs.vpcs_vm.VPCSVM._check_requirements", return_value=True):
|
||||
with asyncio_patch("asyncio.create_subprocess_exec", return_value=MagicMock()):
|
||||
vm.start()
|
||||
loop.run_until_complete(asyncio.async(vm.close()))
|
||||
assert vm.is_running() is False
|
Loading…
Reference in New Issue
Block a user