Reorganize export project code in order to move it to his own file

This commit is contained in:
Julien Duponchelle 2016-07-21 18:15:35 +02:00
parent 487e99bea5
commit 3300e9ec48
No known key found for this signature in database
GPG Key ID: CE8B29639E07F5E8
5 changed files with 337 additions and 260 deletions

View File

@ -0,0 +1,132 @@
#!/usr/bin/env python
#
# Copyright (C) 2016 GNS3 Technologies Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import json
import aiohttp
import zipfile
import zipstream
def export_project(project, include_images=False):
"""
Export the project as zip. It's a ZipStream object.
The file will be read chunk by chunk when you iterate on
the zip.
It will ignore some files like snapshots and
:returns: ZipStream object
"""
# To avoid issue with data not saved we disallow the export of a running topologie
if project.is_running():
raise aiohttp.web.HTTPConflict(text="Running topology could not be exported")
z = zipstream.ZipFile()
# First we process the .gns3 in order to be sure we don't have an error
for file in os.listdir(project._path):
if file.endswith(".gns3"):
_export_project_file(project, os.path.join(project._path, file), z, include_images)
for root, dirs, files in os.walk(project._path, topdown=True):
# Remove snapshots and capture
if os.path.split(root)[-1:][0] == "project-files":
dirs[:] = [d for d in dirs if d not in ("snapshots", "tmp")]
# Ignore log files and OS noise
files = [f for f in files if not f.endswith('_log.txt') and not f.endswith('.log') and f != '.DS_Store']
for file in files:
path = os.path.join(root, file)
# Try open the file
try:
open(path).close()
except OSError as e:
msg = "Could not export file {}: {}".format(path, e)
log.warn(msg)
project.emit("log.warning", {"message": msg})
continue
if file.endswith(".gns3"):
pass
else:
z.write(path, os.path.relpath(path, project._path), compress_type=zipfile.ZIP_DEFLATED)
return z
def _export_project_file(project, path, z, include_images):
"""
Take a project file (.gns3) and patch it for the export
We rename the .gns3 project.gns3 to avoid the task to the client to guess the file name
:param path: Path of the .gns3
"""
# Image file that we need to include in the exported archive
images = set()
with open(path) as f:
topology = json.load(f)
if "topology" in topology and "nodes" in topology["topology"]:
for node in topology["topology"]["nodes"]:
if node["node_type"] in ["virtualbox", "vmware", "cloud"]:
raise aiohttp.web.HTTPConflict(text="Topology with a {} could not be exported".format(node["node_type"]))
if "properties" in node and node["node_type"] != "Docker":
for prop, value in node["properties"].items():
if prop.endswith("image"):
node["properties"][prop] = os.path.basename(value)
if include_images is True:
images.add(value)
for image in images:
_export_images(project, image, z)
z.writestr("project.gns3", json.dumps(topology).encode())
def _export_images(project, image, z):
"""
Take a project file (.gns3) and export images to the zip
:param image: Image path
:param z: Zipfile instance for the export
"""
from ..compute import MODULES
for module in MODULES:
try:
img_directory = module.instance().get_images_directory()
except NotImplementedError:
# Some modules don't have images
continue
directory = os.path.split(img_directory)[-1:][0]
if os.path.exists(image):
path = image
else:
path = os.path.join(img_directory, image)
if os.path.exists(path):
arcname = os.path.join("images", directory, os.path.basename(image))
z.write(path, arcname)
break

View File

@ -20,8 +20,6 @@ import json
import asyncio
import aiohttp
import shutil
import zipstream
import zipfile
from uuid import UUID, uuid4
@ -428,110 +426,6 @@ class Project:
drawing = yield from self.add_drawing(**drawing_data)
self._status = "opened"
def export(self, include_images=False):
"""
Export the project as zip. It's a ZipStream object.
The file will be read chunk by chunk when you iterate on
the zip.
It will ignore some files like snapshots and
:returns: ZipStream object
"""
# To avoid issue with data not saved we disallow the export of a running topologie
if self.is_running():
raise aiohttp.web.HTTPConflict(text="Running topology could not be exported")
z = zipstream.ZipFile()
# First we process the .gns3 in order to be sure we don't have an error
for file in os.listdir(self._path):
if file.endswith(".gns3"):
self._export_project_file(os.path.join(self._path, file), z, include_images)
for root, dirs, files in os.walk(self._path, topdown=True):
# Remove snapshots and capture
if os.path.split(root)[-1:][0] == "project-files":
dirs[:] = [d for d in dirs if d not in ("snapshots", "tmp")]
# Ignore log files and OS noise
files = [f for f in files if not f.endswith('_log.txt') and not f.endswith('.log') and f != '.DS_Store']
for file in files:
path = os.path.join(root, file)
# Try open the file
try:
open(path).close()
except OSError as e:
msg = "Could not export file {}: {}".format(path, e)
log.warn(msg)
self.emit("log.warning", {"message": msg})
continue
if file.endswith(".gns3"):
pass
else:
z.write(path, os.path.relpath(path, self._path), compress_type=zipfile.ZIP_DEFLATED)
return z
def _export_project_file(self, path, z, include_images):
"""
Take a project file (.gns3) and patch it for the export
We rename the .gns3 project.gns3 to avoid the task to the client to guess the file name
:param path: Path of the .gns3
"""
# Image file that we need to include in the exported archive
images = set()
with open(path) as f:
topology = json.load(f)
if "topology" in topology and "nodes" in topology["topology"]:
for node in topology["topology"]["nodes"]:
if node["node_type"] in ["virtualbox", "vmware", "cloud"]:
raise aiohttp.web.HTTPConflict(text="Topology with a {} could not be exported".format(node["node_type"]))
if "properties" in node and node["node_type"] != "Docker":
for prop, value in node["properties"].items():
if prop.endswith("image"):
node["properties"][prop] = os.path.basename(value)
if include_images is True:
images.add(value)
for image in images:
self._export_images(image, z)
z.writestr("project.gns3", json.dumps(topology).encode())
def _export_images(self, image, z):
"""
Take a project file (.gns3) and export images to the zip
:param image: Image path
:param z: Zipfile instance for the export
"""
from ..compute import MODULES
for module in MODULES:
try:
img_directory = module.instance().get_images_directory()
except NotImplementedError:
# Some modules don't have images
continue
directory = os.path.split(img_directory)[-1:][0]
if os.path.exists(image):
path = image
else:
path = os.path.join(img_directory, image)
if os.path.exists(path):
arcname = os.path.join("images", directory, os.path.basename(image))
z.write(path, arcname)
break
def is_running(self):
"""
If a node is started or paused return True

View File

@ -24,6 +24,7 @@ from gns3server.web.route import Route
from gns3server.controller import Controller
from gns3server.controller.project import Project
from gns3server.controller.import_project import import_project
from gns3server.controller.export_project import export_project
from gns3server.config import Config
@ -236,7 +237,7 @@ class ProjectHandler:
started = False
for data in project.export(include_images=bool(request.GET.get("include_images", "0"))):
for data in export_project(project, include_images=bool(request.GET.get("include_images", "0"))):
# We need to do that now because export could failed and raise an HTTP error
# that why response start need to be the later possible
if not started:

View File

@ -0,0 +1,203 @@
#!/usr/bin/env python
#
# Copyright (C) 2016 GNS3 Technologies Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import json
import pytest
import aiohttp
import zipfile
from unittest.mock import patch
from unittest.mock import MagicMock
from tests.utils import AsyncioMagicMock
from gns3server.controller.project import Project
from gns3server.controller.export_project import export_project
@pytest.fixture
def project(controller):
return Project(controller=controller, name="Test")
@pytest.fixture
def node(controller, project, async_run):
compute = MagicMock()
compute.id = "local"
response = MagicMock()
response.json = {"console": 2048}
compute.post = AsyncioMagicMock(return_value=response)
node = async_run(project.add_node(compute, "test", None, node_type="vpcs", properties={"startup_config": "test.cfg"}))
return node
def test_export(tmpdir, project):
path = project.path
os.makedirs(os.path.join(path, "vm-1", "dynamips"))
# The .gns3 should be renamed project.gns3 in order to simplify import
with open(os.path.join(path, "test.gns3"), 'w+') as f:
f.write("{}")
with open(os.path.join(path, "vm-1", "dynamips", "test"), 'w+') as f:
f.write("HELLO")
with open(os.path.join(path, "vm-1", "dynamips", "test_log.txt"), 'w+') as f:
f.write("LOG")
os.makedirs(os.path.join(path, "project-files", "snapshots"))
with open(os.path.join(path, "project-files", "snapshots", "test"), 'w+') as f:
f.write("WORLD")
z = export_project(project)
with open(str(tmpdir / 'zipfile.zip'), 'wb') as f:
for data in z:
f.write(data)
with zipfile.ZipFile(str(tmpdir / 'zipfile.zip')) as myzip:
with myzip.open("vm-1/dynamips/test") as myfile:
content = myfile.read()
assert content == b"HELLO"
assert 'test.gns3' not in myzip.namelist()
assert 'project.gns3' in myzip.namelist()
assert 'project-files/snapshots/test' not in myzip.namelist()
assert 'vm-1/dynamips/test_log.txt' not in myzip.namelist()
def test_export_disallow_running(tmpdir, project, node):
"""
Dissallow export when a node is running
"""
path = project.path
topology = {
"topology": {
"nodes": [
{
"node_type": "dynamips"
}
]
}
}
with open(os.path.join(path, "test.gns3"), 'w+') as f:
json.dump(topology, f)
node._status = "started"
with pytest.raises(aiohttp.web.HTTPConflict):
z = export_project(project)
def test_export_disallow_some_type(tmpdir, project):
"""
Dissalow export for some node type
"""
path = project.path
topology = {
"topology": {
"nodes": [
{
"node_type": "virtualbox"
}
]
}
}
with open(os.path.join(path, "test.gns3"), 'w+') as f:
json.dump(topology, f)
with pytest.raises(aiohttp.web.HTTPConflict):
z = export_project(project)
def test_export_fix_path(tmpdir, project):
"""
Fix absolute image path
"""
path = project.path
topology = {
"topology": {
"nodes": [
{
"properties": {
"image": "/tmp/c3725-adventerprisek9-mz.124-25d.image"
},
"node_type": "dynamips"
}
]
}
}
with open(os.path.join(path, "test.gns3"), 'w+') as f:
json.dump(topology, f)
z = export_project(project)
with open(str(tmpdir / 'zipfile.zip'), 'wb') as f:
for data in z:
f.write(data)
with zipfile.ZipFile(str(tmpdir / 'zipfile.zip')) as myzip:
with myzip.open("project.gns3") as myfile:
content = myfile.read().decode()
topology = json.loads(content)
assert topology["topology"]["nodes"][0]["properties"]["image"] == "c3725-adventerprisek9-mz.124-25d.image"
def test_export_with_images(tmpdir, project):
"""
Fix absolute image path
"""
path = project.path
os.makedirs(str(tmpdir / "IOS"))
with open(str(tmpdir / "IOS" / "test.image"), "w+") as f:
f.write("AAA")
topology = {
"topology": {
"nodes": [
{
"properties": {
"image": "test.image"
},
"node_type": "dynamips"
}
]
}
}
with open(os.path.join(path, "test.gns3"), 'w+') as f:
json.dump(topology, f)
with patch("gns3server.compute.Dynamips.get_images_directory", return_value=str(tmpdir / "IOS"),):
z = export_project(project, include_images=True)
with open(str(tmpdir / 'zipfile.zip'), 'wb') as f:
for data in z:
f.write(data)
with zipfile.ZipFile(str(tmpdir / 'zipfile.zip')) as myzip:
myzip.getinfo("images/IOS/test.image")

View File

@ -331,159 +331,6 @@ def test_open_close(async_run, controller):
assert project.status == "closed"
def test_export(tmpdir, project):
path = project.path
os.makedirs(os.path.join(path, "vm-1", "dynamips"))
# The .gns3 should be renamed project.gns3 in order to simplify import
with open(os.path.join(path, "test.gns3"), 'w+') as f:
f.write("{}")
with open(os.path.join(path, "vm-1", "dynamips", "test"), 'w+') as f:
f.write("HELLO")
with open(os.path.join(path, "vm-1", "dynamips", "test_log.txt"), 'w+') as f:
f.write("LOG")
os.makedirs(os.path.join(path, "project-files", "snapshots"))
with open(os.path.join(path, "project-files", "snapshots", "test"), 'w+') as f:
f.write("WORLD")
z = project.export()
with open(str(tmpdir / 'zipfile.zip'), 'wb') as f:
for data in z:
f.write(data)
with zipfile.ZipFile(str(tmpdir / 'zipfile.zip')) as myzip:
with myzip.open("vm-1/dynamips/test") as myfile:
content = myfile.read()
assert content == b"HELLO"
assert 'test.gns3' not in myzip.namelist()
assert 'project.gns3' in myzip.namelist()
assert 'project-files/snapshots/test' not in myzip.namelist()
assert 'vm-1/dynamips/test_log.txt' not in myzip.namelist()
def test_export_disallow_running(tmpdir, project, node):
"""
Dissallow export when a node is running
"""
path = project.path
topology = {
"topology": {
"nodes": [
{
"node_type": "dynamips"
}
]
}
}
with open(os.path.join(path, "test.gns3"), 'w+') as f:
json.dump(topology, f)
node._status = "started"
with pytest.raises(aiohttp.web.HTTPConflict):
z = project.export()
def test_export_disallow_some_type(tmpdir, project):
"""
Dissalow export for some node type
"""
path = project.path
topology = {
"topology": {
"nodes": [
{
"node_type": "virtualbox"
}
]
}
}
with open(os.path.join(path, "test.gns3"), 'w+') as f:
json.dump(topology, f)
with pytest.raises(aiohttp.web.HTTPConflict):
z = project.export()
def test_export_fix_path(tmpdir, project):
"""
Fix absolute image path
"""
path = project.path
topology = {
"topology": {
"nodes": [
{
"properties": {
"image": "/tmp/c3725-adventerprisek9-mz.124-25d.image"
},
"node_type": "dynamips"
}
]
}
}
with open(os.path.join(path, "test.gns3"), 'w+') as f:
json.dump(topology, f)
z = project.export()
with open(str(tmpdir / 'zipfile.zip'), 'wb') as f:
for data in z:
f.write(data)
with zipfile.ZipFile(str(tmpdir / 'zipfile.zip')) as myzip:
with myzip.open("project.gns3") as myfile:
content = myfile.read().decode()
topology = json.loads(content)
assert topology["topology"]["nodes"][0]["properties"]["image"] == "c3725-adventerprisek9-mz.124-25d.image"
def test_export_with_images(tmpdir, project):
"""
Fix absolute image path
"""
path = project.path
os.makedirs(str(tmpdir / "IOS"))
with open(str(tmpdir / "IOS" / "test.image"), "w+") as f:
f.write("AAA")
topology = {
"topology": {
"nodes": [
{
"properties": {
"image": "test.image"
},
"node_type": "dynamips"
}
]
}
}
with open(os.path.join(path, "test.gns3"), 'w+') as f:
json.dump(topology, f)
with patch("gns3server.compute.Dynamips.get_images_directory", return_value=str(tmpdir / "IOS"),):
z = project.export(include_images=True)
with open(str(tmpdir / 'zipfile.zip'), 'wb') as f:
for data in z:
f.write(data)
with zipfile.ZipFile(str(tmpdir / 'zipfile.zip')) as myzip:
myzip.getinfo("images/IOS/test.image")
def test_is_running(project, async_run, node):
"""
If a node is started or paused return True