gns3-server/tests/controller/test_export_project.py

462 lines
15 KiB
Python
Raw Normal View History

#!/usr/bin/env python
#
# Copyright (C) 2016 GNS3 Technologies Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import json
import pytest
import aiohttp
import zipfile
from pathlib import Path
from unittest.mock import patch
from unittest.mock import MagicMock
from tests.utils import AsyncioMagicMock, AsyncioBytesIO
from gns3server.controller.project import Project
2018-04-28 12:46:47 +03:00
from gns3server.controller.export_project import export_project, _is_exportable
from gns3server.utils.asyncio import aiozipstream
@pytest.fixture
def project(controller):
p = Project(controller=controller, name="test")
p.dump = MagicMock()
return p
@pytest.fixture
def node(controller, project, async_run):
compute = MagicMock()
compute.id = "local"
response = MagicMock()
response.json = {"console": 2048}
compute.post = AsyncioMagicMock(return_value=response)
node = async_run(project.add_node(compute, "test", None, node_type="vpcs", properties={"startup_config": "test.cfg"}))
return node
async def write_file(path, z):
with open(path, 'wb') as f:
async for chunk in z:
f.write(chunk)
2018-04-28 12:46:47 +03:00
def test_exportable_files():
assert _is_exportable("hello/world")
assert not _is_exportable("project-files/tmp")
assert not _is_exportable("project-files/test_log.txt")
assert not _is_exportable("project-files/test.log")
assert not _is_exportable("test/snapshots")
assert not _is_exportable("test/project-files/snapshots")
assert not _is_exportable("test/project-files/snapshots/test.gns3p")
2016-07-21 21:17:36 +03:00
def test_export(tmpdir, project, async_run):
path = project.path
os.makedirs(os.path.join(path, "vm-1", "dynamips"))
os.makedirs(str(tmpdir / "IOS"))
with open(str(tmpdir / "IOS" / "test.image"), "w+") as f:
f.write("AAA")
# The .gns3 should be renamed project.gns3 in order to simplify import
with open(os.path.join(path, "test.gns3"), 'w+') as f:
2016-07-22 14:39:57 +03:00
data = {
"topology": {
"computes": [
{
"compute_id": "6b7149c8-7d6e-4ca0-ab6b-daa8ab567be0",
"host": "127.0.0.1",
"name": "Remote 1",
"port": 8001,
"protocol": "http"
}
],
"nodes": [
{
"compute_id": "6b7149c8-7d6e-4ca0-ab6b-daa8ab567be0",
"node_type": "dynamips",
"properties": {
"image": "test.image"
}
2016-07-22 14:39:57 +03:00
}
]
}
}
json.dump(data, f)
with open(os.path.join(path, "vm-1", "dynamips", "test"), 'w+') as f:
f.write("HELLO")
with open(os.path.join(path, "vm-1", "dynamips", "test_log.txt"), 'w+') as f:
f.write("LOG")
os.makedirs(os.path.join(path, "project-files", "snapshots"))
with open(os.path.join(path, "project-files", "snapshots", "test"), 'w+') as f:
f.write("WORLD")
with aiozipstream.ZipFile() as z:
with patch("gns3server.compute.Dynamips.get_images_directory", return_value=str(tmpdir / "IOS"),):
async_run(export_project(z, project, str(tmpdir), include_images=False))
async_run(write_file(str(tmpdir / 'zipfile.zip'), z))
with zipfile.ZipFile(str(tmpdir / 'zipfile.zip')) as myzip:
with myzip.open("vm-1/dynamips/test") as myfile:
content = myfile.read()
assert content == b"HELLO"
assert 'test.gns3' not in myzip.namelist()
assert 'project.gns3' in myzip.namelist()
assert 'project-files/snapshots/test' not in myzip.namelist()
assert 'vm-1/dynamips/test_log.txt' not in myzip.namelist()
assert 'images/IOS/test.image' not in myzip.namelist()
2016-07-22 14:39:57 +03:00
with myzip.open("project.gns3") as myfile:
topo = json.loads(myfile.read().decode())["topology"]
assert topo["nodes"][0]["compute_id"] == "local" # All node should have compute_id local after export
2016-07-22 14:39:57 +03:00
assert topo["computes"] == []
def test_export_vm(tmpdir, project, async_run):
"""
If data is on a remote server export it locally before
sending it in the archive.
"""
compute = MagicMock()
compute.id = "vm"
compute.list_files = AsyncioMagicMock(return_value=[{"path": "vm-1/dynamips/test"}])
# Fake file that will be download from the vm
2016-09-19 17:51:15 +03:00
mock_response = AsyncioMagicMock()
mock_response.content = AsyncioBytesIO()
async_run(mock_response.content.write(b"HELLO"))
mock_response.content.seek(0)
compute.download_file = AsyncioMagicMock(return_value=mock_response)
project._project_created_on_compute.add(compute)
path = project.path
os.makedirs(os.path.join(path, "vm-1", "dynamips"))
# The .gns3 should be renamed project.gns3 in order to simplify import
with open(os.path.join(path, "test.gns3"), 'w+') as f:
f.write("{}")
with aiozipstream.ZipFile() as z:
async_run(export_project(z, project, str(tmpdir)))
assert compute.list_files.called
async_run(write_file(str(tmpdir / 'zipfile.zip'), z))
with zipfile.ZipFile(str(tmpdir / 'zipfile.zip')) as myzip:
with myzip.open("vm-1/dynamips/test") as myfile:
content = myfile.read()
assert content == b"HELLO"
2016-07-21 21:17:36 +03:00
def test_export_disallow_running(tmpdir, project, node, async_run):
"""
Disallow export when a node is running
"""
path = project.path
topology = {
"topology": {
"nodes": [
{
"node_type": "dynamips"
}
]
}
}
with open(os.path.join(path, "test.gns3"), 'w+') as f:
json.dump(topology, f)
node._status = "started"
with pytest.raises(aiohttp.web.HTTPConflict):
with aiozipstream.ZipFile() as z:
async_run(export_project(z, project, str(tmpdir)))
2016-07-21 21:17:36 +03:00
def test_export_disallow_some_type(tmpdir, project, async_run):
"""
Disallow export for some node type
"""
path = project.path
topology = {
"topology": {
"nodes": [
{
2019-02-23 06:13:50 +02:00
"node_type": "vmware"
}
]
}
}
with open(os.path.join(path, "test.gns3"), 'w+') as f:
json.dump(topology, f)
with pytest.raises(aiohttp.web.HTTPConflict):
with aiozipstream.ZipFile() as z:
async_run(export_project(z, project, str(tmpdir)))
with aiozipstream.ZipFile() as z:
async_run(export_project(z, project, str(tmpdir), allow_all_nodes=True))
# VirtualBox is always disallowed
topology = {
"topology": {
"nodes": [
{
"node_type": "virtualbox",
"properties": {
"linked_clone": True
}
}
]
}
}
with open(os.path.join(path, "test.gns3"), 'w+') as f:
json.dump(topology, f)
with pytest.raises(aiohttp.web.HTTPConflict):
with aiozipstream.ZipFile() as z:
async_run(export_project(z, project, str(tmpdir), allow_all_nodes=True))
2016-07-21 21:17:36 +03:00
def test_export_fix_path(tmpdir, project, async_run):
"""
Fix absolute image path, except for Docker
"""
path = project.path
topology = {
"topology": {
"nodes": [
{
"properties": {
"image": "/tmp/c3725-adventerprisek9-mz.124-25d.image"
},
"node_type": "dynamips"
},
{
"properties": {
"image": "gns3/webterm:lastest"
},
"node_type": "docker"
}
]
}
}
with open(os.path.join(path, "test.gns3"), 'w+') as f:
json.dump(topology, f)
with aiozipstream.ZipFile() as z:
async_run(export_project(z, project, str(tmpdir)))
async_run(write_file(str(tmpdir / 'zipfile.zip'), z))
with zipfile.ZipFile(str(tmpdir / 'zipfile.zip')) as myzip:
with myzip.open("project.gns3") as myfile:
content = myfile.read().decode()
topology = json.loads(content)
assert topology["topology"]["nodes"][0]["properties"]["image"] == "c3725-adventerprisek9-mz.124-25d.image"
assert topology["topology"]["nodes"][1]["properties"]["image"] == "gns3/webterm:lastest"
2016-07-21 21:17:36 +03:00
def test_export_with_images(tmpdir, project, async_run):
"""
Fix absolute image path
"""
path = project.path
os.makedirs(str(tmpdir / "IOS"))
with open(str(tmpdir / "IOS" / "test.image"), "w+") as f:
f.write("AAA")
topology = {
"topology": {
"nodes": [
{
"properties": {
"image": "test.image"
},
"node_type": "dynamips"
}
]
}
}
with open(os.path.join(path, "test.gns3"), 'w+') as f:
json.dump(topology, f)
with aiozipstream.ZipFile() as z:
with patch("gns3server.compute.Dynamips.get_images_directory", return_value=str(tmpdir / "IOS"),):
async_run(export_project(z, project, str(tmpdir), include_images=True))
async_run(write_file(str(tmpdir / 'zipfile.zip'), z))
with zipfile.ZipFile(str(tmpdir / 'zipfile.zip')) as myzip:
myzip.getinfo("images/IOS/test.image")
def test_export_keep_compute_id(tmpdir, project, async_run):
"""
If we want to restore the same computes we could ask to keep them
in the file
"""
with open(os.path.join(project.path, "test.gns3"), 'w+') as f:
data = {
"topology": {
"computes": [
{
"compute_id": "6b7149c8-7d6e-4ca0-ab6b-daa8ab567be0",
"host": "127.0.0.1",
"name": "Remote 1",
"port": 8001,
"protocol": "http"
}
],
"nodes": [
{
"compute_id": "6b7149c8-7d6e-4ca0-ab6b-daa8ab567be0",
"node_type": "vpcs"
}
]
}
}
json.dump(data, f)
with aiozipstream.ZipFile() as z:
async_run(export_project(z, project, str(tmpdir), keep_compute_id=True))
async_run(write_file(str(tmpdir / 'zipfile.zip'), z))
with zipfile.ZipFile(str(tmpdir / 'zipfile.zip')) as myzip:
with myzip.open("project.gns3") as myfile:
topo = json.loads(myfile.read().decode())["topology"]
assert topo["nodes"][0]["compute_id"] == "6b7149c8-7d6e-4ca0-ab6b-daa8ab567be0"
assert len(topo["computes"]) == 1
def test_export_images_from_vm(tmpdir, project, async_run):
"""
If data is on a remote server export it locally before
sending it in the archive.
"""
compute = MagicMock()
compute.id = "vm"
compute.list_files = AsyncioMagicMock(return_value=[
{"path": "vm-1/dynamips/test"}
])
# Fake file that will be download from the vm
mock_response = AsyncioMagicMock()
mock_response.content = AsyncioBytesIO()
async_run(mock_response.content.write(b"HELLO"))
mock_response.content.seek(0)
mock_response.status = 200
compute.download_file = AsyncioMagicMock(return_value=mock_response)
mock_response = AsyncioMagicMock()
mock_response.content = AsyncioBytesIO()
async_run(mock_response.content.write(b"IMAGE"))
mock_response.content.seek(0)
mock_response.status = 200
compute.download_image = AsyncioMagicMock(return_value=mock_response)
project._project_created_on_compute.add(compute)
path = project.path
os.makedirs(os.path.join(path, "vm-1", "dynamips"))
topology = {
"topology": {
"nodes": [
{
"compute_id": "vm",
"properties": {
"image": "test.image"
},
"node_type": "dynamips"
}
]
}
}
# The .gns3 should be renamed project.gns3 in order to simplify import
with open(os.path.join(path, "test.gns3"), 'w+') as f:
f.write(json.dumps(topology))
with aiozipstream.ZipFile() as z:
async_run(export_project(z, project, str(tmpdir), include_images=True))
assert compute.list_files.called
async_run(write_file(str(tmpdir / 'zipfile.zip'), z))
with zipfile.ZipFile(str(tmpdir / 'zipfile.zip')) as myzip:
with myzip.open("vm-1/dynamips/test") as myfile:
content = myfile.read()
assert content == b"HELLO"
with myzip.open("images/dynamips/test.image") as myfile:
content = myfile.read()
assert content == b"IMAGE"
def test_export_with_ignoring_snapshots(tmpdir, project, async_run):
with open(os.path.join(project.path, "test.gns3"), 'w+') as f:
data = {
"topology": {
"computes": [
{
"compute_id": "6b7149c8-7d6e-4ca0-ab6b-daa8ab567be0",
"host": "127.0.0.1",
"name": "Remote 1",
"port": 8001,
"protocol": "http"
}
],
"nodes": [
{
"compute_id": "6b7149c8-7d6e-4ca0-ab6b-daa8ab567be0",
"node_type": "vpcs"
}
]
}
}
json.dump(data, f)
# create snapshot directory
snapshots_dir = os.path.join(project.path, 'snapshots')
os.makedirs(snapshots_dir)
Path(os.path.join(snapshots_dir, 'snap.gns3project')).touch()
with aiozipstream.ZipFile() as z:
async_run(export_project(z, project, str(tmpdir), keep_compute_id=True))
async_run(write_file(str(tmpdir / 'zipfile.zip'), z))
with zipfile.ZipFile(str(tmpdir / 'zipfile.zip')) as myzip:
assert not os.path.join('snapshots', 'snap.gns3project') in [f.filename for f in myzip.filelist]