Offload slow file operations to threads for snapshots and project "save as". Ref #1187 #1307.

This commit is contained in:
grossmj 2018-04-28 16:01:43 +07:00
parent 20294e284c
commit 50a922f83e
10 changed files with 246 additions and 221 deletions

View File

@ -194,13 +194,13 @@ class Controller:
user=server_config.get("user", ""),
password=server_config.get("password", ""),
force=True)
except aiohttp.web_exceptions.HTTPConflict as e:
except aiohttp.web.HTTPConflict as e:
log.fatal("Can't access to the local server, make sure anything else is not running on the same port")
sys.exit(1)
for c in computes:
try:
yield from self.add_compute(**c)
except (aiohttp.web_exceptions.HTTPConflict, KeyError):
except (aiohttp.web.HTTPConflict, KeyError):
pass # Skip not available servers at loading
yield from self.load_projects()
try:
@ -228,7 +228,7 @@ class Controller:
try:
yield from compute.close()
# We don't care if a compute is down at this step
except (ComputeError, aiohttp.web_exceptions.HTTPError, OSError):
except (ComputeError, aiohttp.web.HTTPError, OSError):
pass
yield from self.gns3vm.exit_vm()
self._computes = {}
@ -308,7 +308,7 @@ class Controller:
if file.endswith(".gns3"):
try:
yield from self.load_project(os.path.join(project_dir, file), load=False)
except (aiohttp.web_exceptions.HTTPConflict, NotImplementedError):
except (aiohttp.web.HTTPConflict, NotImplementedError):
pass # Skip not compatible projects
except OSError as e:
log.error(str(e))

View File

@ -29,122 +29,125 @@ log = logging.getLogger(__name__)
@asyncio.coroutine
def export_project(project, temporary_dir, include_images=False, keep_compute_id=False,
allow_all_nodes=False, ignore_prefixes=None):
def export_project(project, temporary_dir, include_images=False, keep_compute_id=False, allow_all_nodes=False):
"""
Export the project as zip. It's a ZipStream object.
The file will be read chunk by chunk when you iterate on
the zip.
Export a project to a zip file.
It will ignore some files like snapshots and
The file will be read chunk by chunk when you iterate over the zip stream.
Some files like snapshots and packet captures are ignored.
:param temporary_dir: A temporary dir where to store intermediate data
:param keep_compute_id: If false replace all compute id by local it's the standard behavior for .gns3project to make them portable
:param allow_all_nodes: Allow all nodes type to be include in the zip even if not portable default False
:param include images: save OS images to the zip file
:param keep_compute_id: If false replace all compute id by local (standard behavior for .gns3project to make it portable)
:param allow_all_nodes: Allow all nodes type to be include in the zip even if not portable
:returns: ZipStream object
"""
# To avoid issue with data not saved we disallow the export of a running topologie
# To avoid issue with data not saved we disallow the export of a running project
if project.is_running():
raise aiohttp.web.HTTPConflict(text="Running topology could not be exported")
raise aiohttp.web.HTTPConflict(text="Project must be stopped in order to export it")
# Make sure we save the project
project.dump()
z = zipstream.ZipFile(allowZip64=True)
zstream = zipstream.ZipFile(allowZip64=True)
if not os.path.exists(project._path):
raise aiohttp.web.HTTPNotFound(text="The project doesn't exist at location {}".format(project._path))
raise aiohttp.web.HTTPNotFound(text="Project could not be found at '{}'".format(project._path))
# First we process the .gns3 in order to be sure we don't have an error
for file in os.listdir(project._path):
if file.endswith(".gns3"):
images = yield from _export_project_file(project, os.path.join(project._path, file),
z, include_images, keep_compute_id, allow_all_nodes, temporary_dir)
yield from _patch_project_file(project, os.path.join(project._path, file), zstream, include_images, keep_compute_id, allow_all_nodes, temporary_dir)
# Export the local files
for root, dirs, files in os.walk(project._path, topdown=True):
files = [f for f in files if not _filter_files(os.path.join(root, f))]
files = [f for f in files if _is_exportable(os.path.join(root, f))]
for file in files:
path = os.path.join(root, file)
# Try open the file
# check if we can export the file
try:
open(path).close()
except OSError as e:
msg = "Could not export file {}: {}".format(path, e)
log.warn(msg)
msg = "Could not export file '{}': {}".format(path, e)
log.warning(msg)
project.controller.notification.emit("log.warning", {"message": msg})
continue
# ignore the .gns3 file
if file.endswith(".gns3"):
pass
else:
z.write(path, os.path.relpath(path, project._path), compress_type=zipfile.ZIP_DEFLATED)
continue
zstream.write(path, os.path.relpath(path, project._path), compress_type=zipfile.ZIP_DEFLATED)
# Export files from remote computes
downloaded_files = set()
for compute in project.computes:
if compute.id != "local":
compute_files = yield from compute.list_files(project)
for compute_file in compute_files:
if not _filter_files(compute_file["path"]):
if _is_exportable(compute_file["path"]):
(fd, temp_path) = tempfile.mkstemp(dir=temporary_dir)
f = open(fd, "wb", closefd=True)
response = yield from compute.download_file(project, compute_file["path"])
while True:
data = yield from response.content.read(512)
data = yield from response.content.read(1024)
if not data:
break
f.write(data)
response.close()
f.close()
z.write(temp_path, arcname=compute_file["path"], compress_type=zipfile.ZIP_DEFLATED)
zstream.write(temp_path, arcname=compute_file["path"], compress_type=zipfile.ZIP_DEFLATED)
downloaded_files.add(compute_file['path'])
return z
return zstream
def _filter_files(path):
def _is_exportable(path):
"""
:returns: True if file should not be included in the final archive
"""
s = os.path.normpath(path).split(os.path.sep)
# do not export snapshots
if path.endswith("snapshots"):
return True
return False
# filter directory of snapshots
# do not export directories of snapshots
if "{sep}snapshots{sep}".format(sep=os.path.sep) in path:
return True
return False
try:
# do not export captures and other temporary directory
s = os.path.normpath(path).split(os.path.sep)
i = s.index("project-files")
if s[i + 1] in ("tmp", "captures", "snapshots"):
return True
return False
except (ValueError, IndexError):
pass
file_name = os.path.basename(path)
# Ignore log files and OS noises
if file_name.endswith('_log.txt') or file_name.endswith('.log') or file_name == '.DS_Store':
return True
# do not export log files and OS noise
filename = os.path.basename(path)
if filename.endswith('_log.txt') or filename.endswith('.log') or filename == '.DS_Store':
return False
return True
@asyncio.coroutine
def _export_project_file(project, path, z, include_images, keep_compute_id, allow_all_nodes, temporary_dir):
def _patch_project_file(project, path, zstream, include_images, keep_compute_id, allow_all_nodes, temporary_dir):
"""
Take a project file (.gns3) and patch it for the export
Patch a project file (.gns3) to export a project.
The .gns3 file is renamed to project.gns3
We rename the .gns3 project.gns3 to avoid the task to the client to guess the file name
:param path: Path of the .gns3
:param path: path of the .gns3 file
"""
# Image file that we need to include in the exported archive
# image files that we need to include in the exported archive
images = []
try:
with open(path) as f:
topology = json.load(f)
except (OSError, ValueError) as e:
raise aiohttp.web.HTTPConflict(text="Project file '{}' cannot be read: {}".format(path, e))
if "topology" in topology:
if "nodes" in topology["topology"]:
@ -152,9 +155,9 @@ def _export_project_file(project, path, z, include_images, keep_compute_id, allo
compute_id = node.get('compute_id', 'local')
if node["node_type"] == "virtualbox" and node.get("properties", {}).get("linked_clone"):
raise aiohttp.web.HTTPConflict(text="Topology with a linked {} clone could not be exported. Use qemu instead.".format(node["node_type"]))
raise aiohttp.web.HTTPConflict(text="Projects with a linked {} clone node cannot not be exported. Please use Qemu instead.".format(node["node_type"]))
if not allow_all_nodes and node["node_type"] in ["virtualbox", "vmware", "cloud"]:
raise aiohttp.web.HTTPConflict(text="Topology with a {} could not be exported".format(node["node_type"]))
raise aiohttp.web.HTTPConflict(text="Projects with a {} node cannot be exported".format(node["node_type"]))
if not keep_compute_id:
node["compute_id"] = "local" # To make project portable all node by default run on local
@ -186,78 +189,69 @@ def _export_project_file(project, path, z, include_images, keep_compute_id, allo
local_images = set([i['image'] for i in images if i['compute_id'] == 'local'])
for image in local_images:
_export_local_images(project, image, z)
_export_local_image(image, zstream)
remote_images = set([
(i['compute_id'], i['image_type'], i['image'])
for i in images if i['compute_id'] != 'local'])
for compute_id, image_type, image in remote_images:
yield from _export_remote_images(project, compute_id, image_type, image, z, temporary_dir)
yield from _export_remote_images(project, compute_id, image_type, image, zstream, temporary_dir)
z.writestr("project.gns3", json.dumps(topology).encode())
zstream.writestr("project.gns3", json.dumps(topology).encode())
return images
def _export_local_images(project, image, z):
def _export_local_image(image, zstream):
"""
Take a project file (.gns3) and export images to the zip
Exports a local image to the zip file.
:param image: Image path
:param z: Zipfile instance for the export
:param image: image path
:param zstream: Zipfile instance for the export
"""
from ..compute import MODULES
for module in MODULES:
try:
img_directory = module.instance().get_images_directory()
images_directory = module.instance().get_images_directory()
except NotImplementedError:
# Some modules don't have images
continue
directory = os.path.split(img_directory)[-1:][0]
directory = os.path.split(images_directory)[-1:][0]
if os.path.exists(image):
path = image
else:
path = os.path.join(img_directory, image)
path = os.path.join(images_directory, image)
if os.path.exists(path):
arcname = os.path.join("images", directory, os.path.basename(image))
z.write(path, arcname)
zstream.write(path, arcname)
return
@asyncio.coroutine
def _export_remote_images(project, compute_id, image_type, image, project_zipfile, temporary_dir):
"""
Export specific image from remote compute
:param project:
:param compute_id:
:param image_type:
:param image:
:param project_zipfile:
:return:
Export specific image from remote compute.
"""
log.info("Obtaining image `{}` from `{}`".format(image, compute_id))
log.info("Downloading image '{}' from compute server '{}'".format(image, compute_id))
try:
compute = [compute for compute in project.computes if compute.id == compute_id][0]
except IndexError:
raise aiohttp.web.HTTPConflict(
text="Cannot export image from `{}` compute. Compute doesn't exist.".format(compute_id))
raise aiohttp.web.HTTPConflict(text="Cannot export image from '{}' compute. Compute doesn't exist.".format(compute_id))
(fd, temp_path) = tempfile.mkstemp(dir=temporary_dir)
f = open(fd, "wb", closefd=True)
response = yield from compute.download_image(image_type, image)
if response.status != 200:
raise aiohttp.web.HTTPConflict(
text="Cannot export image from `{}` compute. Compute sent `{}` status.".format(
compute_id, response.status))
raise aiohttp.web.HTTPConflict(text="Cannot export image from '{}' compute. Compute returned status code {}.".format(compute_id, response.status))
while True:
data = yield from response.content.read(512)
data = yield from response.content.read(1024)
if not data:
break
f.write(data)

View File

@ -26,7 +26,7 @@ import aiohttp
import itertools
from .topology import load_topology
from ..utils.asyncio import wait_run_in_executor
"""
Handle the import of project from a .gns3project
@ -38,7 +38,7 @@ def import_project(controller, project_id, stream, location=None, name=None, kee
"""
Import a project contain in a zip file
You need to handle OSerror exceptions
You need to handle OSError exceptions
:param controller: GNS3 Controller
:param project_id: ID of the project to import
@ -46,6 +46,7 @@ def import_project(controller, project_id, stream, location=None, name=None, kee
:param location: Directory for the project if None put in the default directory
:param name: Wanted project name, generate one from the .gns3 if None
:param keep_compute_id: If true do not touch the compute id
:returns: Project
"""
@ -53,11 +54,13 @@ def import_project(controller, project_id, stream, location=None, name=None, kee
raise aiohttp.web.HTTPConflict(text="The destination path should not contain .gns3")
try:
with zipfile.ZipFile(stream) as myzip:
with zipfile.ZipFile(stream) as zip_file:
project_file = zip_file.read("project.gns3").decode()
except zipfile.BadZipFile:
raise aiohttp.web.HTTPConflict(text="Cannot import project, not a GNS3 project (invalid zip) or project.gns3 file could not be found")
try:
topology = json.loads(myzip.read("project.gns3").decode())
topology = json.loads(project_file)
# We import the project on top of an existing project (snapshots)
if topology["project_id"] == project_id:
project_name = topology["name"]
@ -67,8 +70,8 @@ def import_project(controller, project_id, stream, location=None, name=None, kee
project_name = controller.get_free_project_name(name)
else:
project_name = controller.get_free_project_name(topology["name"])
except KeyError:
raise aiohttp.web.HTTPConflict(text="Can't import topology the .gns3 is corrupted or missing")
except (ValueError, KeyError):
raise aiohttp.web.HTTPConflict(text="Cannot import project, the project.gns3 file is corrupted")
if location:
path = location
@ -77,9 +80,14 @@ def import_project(controller, project_id, stream, location=None, name=None, kee
path = os.path.join(projects_path, project_id)
try:
os.makedirs(path, exist_ok=True)
except UnicodeEncodeError as e:
except UnicodeEncodeError:
raise aiohttp.web.HTTPConflict(text="The project name contain non supported or invalid characters")
myzip.extractall(path)
try:
with zipfile.ZipFile(stream) as zip_file:
yield from wait_run_in_executor(zip_file.extractall, path)
except zipfile.BadZipFile:
raise aiohttp.web.HTTPConflict(text="Cannot extract files from GNS3 project (invalid zip)")
topology = load_topology(os.path.join(path, "project.gns3"))
topology["name"] = project_name
@ -124,17 +132,12 @@ def import_project(controller, project_id, stream, location=None, name=None, kee
compute_created = set()
for node in topology["topology"]["nodes"]:
if node["compute_id"] != "local":
# Project created on the remote GNS3 VM?
if node["compute_id"] not in compute_created:
compute = controller.get_compute(node["compute_id"])
yield from compute.post("/projects", data={
"name": project_name,
"project_id": project_id,
})
yield from compute.post("/projects", data={"name": project_name, "project_id": project_id,})
compute_created.add(node["compute_id"])
yield from _move_files_to_compute(compute, project_id, path, os.path.join("project-files", node["node_type"], node["node_id"]))
# And we dump the updated.gns3
@ -150,18 +153,17 @@ def import_project(controller, project_id, stream, location=None, name=None, kee
project = yield from controller.load_project(dot_gns3_path, load=False)
return project
except zipfile.BadZipFile:
raise aiohttp.web.HTTPConflict(text="Can't import topology the file is corrupted or not a GNS3 project (invalid zip)")
def _move_node_file(path, old_id, new_id):
"""
Move the files from a node when changing his id
Move a file from a node when changing its id
:param path: Path of the project
:param old_id: ID before change
:param new_id: New node UUID
"""
root = os.path.join(path, "project-files")
if os.path.exists(root):
for dirname in os.listdir(root):
@ -175,8 +177,9 @@ def _move_node_file(path, old_id, new_id):
@asyncio.coroutine
def _move_files_to_compute(compute, project_id, directory, files_path):
"""
Move the files to a remote compute
Move files to a remote compute
"""
location = os.path.join(directory, files_path)
if os.path.exists(location):
for (dirpath, dirnames, filenames) in os.walk(location):
@ -184,7 +187,7 @@ def _move_files_to_compute(compute, project_id, directory, files_path):
path = os.path.join(dirpath, filename)
dst = os.path.relpath(path, directory)
yield from _upload_file(compute, project_id, path, dst)
shutil.rmtree(os.path.join(directory, files_path))
yield from wait_run_in_executor(shutil.rmtree, os.path.join(directory, files_path))
@asyncio.coroutine
@ -195,6 +198,7 @@ def _upload_file(compute, project_id, file_path, path):
:param file_path: File path on the controller file system
:param path: File path on the remote system relative to project directory
"""
path = "/projects/{}/files/{}".format(project_id, path.replace("\\", "/"))
with open(file_path, "rb") as f:
yield from compute.http_query("POST", path, f, timeout=None)
@ -202,11 +206,10 @@ def _upload_file(compute, project_id, file_path, path):
def _import_images(controller, path):
"""
Copy images to the images directory or delete them if they
already exists.
Copy images to the images directory or delete them if they already exists.
"""
image_dir = controller.images_path()
image_dir = controller.images_path()
root = os.path.join(path, "images")
for (dirpath, dirnames, filenames) in os.walk(root):
for filename in filenames:

View File

@ -37,6 +37,7 @@ from ..config import Config
from ..utils.path import check_path_allowed, get_default_project_directory
from ..utils.asyncio.pool import Pool
from ..utils.asyncio import locked_coroutine
from ..utils.asyncio import wait_run_in_executor
from .export_project import export_project
from .import_project import import_project
@ -631,27 +632,10 @@ class Project:
:param name: Name of the snapshot
"""
if name in [snap.name for snap in self.snapshots.values()]:
raise aiohttp.web_exceptions.HTTPConflict(text="The snapshot {} already exist".format(name))
if name in [snap.name for snap in self._snapshots.values()]:
raise aiohttp.web.HTTPConflict(text="The snapshot name {} already exists".format(name))
snapshot = Snapshot(self, name=name)
try:
if os.path.exists(snapshot.path):
raise aiohttp.web_exceptions.HTTPConflict(text="The snapshot {} already exist".format(name))
os.makedirs(os.path.join(self.path, "snapshots"), exist_ok=True)
with tempfile.TemporaryDirectory() as tmpdir:
zipstream = yield from export_project(self, tmpdir, keep_compute_id=True, allow_all_nodes=True)
try:
with open(snapshot.path, "wb") as f:
for data in zipstream:
f.write(data)
except OSError as e:
raise aiohttp.web.HTTPConflict(text="Could not write snapshot file '{}': {}".format(snapshot.path, e))
except OSError as e:
raise aiohttp.web.HTTPInternalServerError(text="Could not create project directory: {}".format(e))
yield from snapshot.create()
self._snapshots[snapshot.id] = snapshot
return snapshot
@ -857,6 +841,15 @@ class Project:
while self._loading:
yield from asyncio.sleep(0.5)
def _create_duplicate_project_file(self, path, zipstream):
"""
Creates the project file (to be run in its own thread)
"""
with open(path, "wb") as f:
for data in zipstream:
f.write(data)
@asyncio.coroutine
def duplicate(self, name=None, location=None):
"""
@ -878,10 +871,9 @@ class Project:
try:
with tempfile.TemporaryDirectory() as tmpdir:
zipstream = yield from export_project(self, tmpdir, keep_compute_id=True, allow_all_nodes=True)
with open(os.path.join(tmpdir, "project.gns3p"), "wb") as f:
for data in zipstream:
f.write(data)
with open(os.path.join(tmpdir, "project.gns3p"), "rb") as f:
project_path = os.path.join(tmpdir, "project.gns3p")
yield from wait_run_in_executor(self._create_duplicate_project_file, project_path, zipstream)
with open(project_path, "rb") as f:
project = yield from import_project(self._controller, str(uuid.uuid4()), f, location=location, name=name, keep_compute_id=True)
except (OSError, UnicodeEncodeError) as e:
raise aiohttp.web.HTTPConflict(text="Can not duplicate project: {}".format(str(e)))

View File

@ -19,11 +19,13 @@
import os
import uuid
import shutil
import tempfile
import asyncio
import aiohttp.web
from datetime import datetime, timezone
from ..utils.asyncio import wait_run_in_executor
from .export_project import export_project
from .import_project import import_project
@ -71,6 +73,37 @@ class Snapshot:
def created_at(self):
return int(self._created_at)
def _create_snapshot_file(self, zipstream):
"""
Creates the snapshot file (to be run in its own thread)
"""
with open(self.path, "wb") as f:
for data in zipstream:
f.write(data)
@asyncio.coroutine
def create(self):
"""
Create the snapshot
"""
if os.path.exists(self.path):
raise aiohttp.web.HTTPConflict(text="The snapshot file '{}' already exists".format(self.name))
snapshot_directory = os.path.join(self._project.path, "snapshots")
try:
os.makedirs(snapshot_directory, exist_ok=True)
except OSError as e:
raise aiohttp.web.HTTPInternalServerError(text="Could not create the snapshot directory '{}': {}".format(snapshot_directory, e))
try:
with tempfile.TemporaryDirectory() as tmpdir:
zipstream = yield from export_project(self._project, tmpdir, keep_compute_id=True, allow_all_nodes=True)
yield from wait_run_in_executor(self._create_snapshot_file, zipstream)
except OSError as e:
raise aiohttp.web.HTTPConflict(text="Could not create snapshot file '{}': {}".format(self.path, e))
@asyncio.coroutine
def restore(self):
"""
@ -78,18 +111,21 @@ class Snapshot:
"""
yield from self._project.delete_on_computes()
# We don't send close notif to clients because the close / open dance is purely internal
# We don't send close notification to clients because the close / open dance is purely internal
yield from self._project.close(ignore_notification=True)
self._project.controller.notification.emit("snapshot.restored", self.__json__())
try:
if os.path.exists(os.path.join(self._project.path, "project-files")):
shutil.rmtree(os.path.join(self._project.path, "project-files"))
# delete the current project files
project_files_path = os.path.join(self._project.path, "project-files")
if os.path.exists(project_files_path):
yield from wait_run_in_executor(shutil.rmtree, project_files_path)
with open(self._path, "rb") as f:
project = yield from import_project(self._project.controller, self._project.id, f, location=self._project.path)
except (OSError, PermissionError) as e:
raise aiohttp.web.HTTPConflict(text=str(e))
self._project.controller.notification.emit("snapshot.restored", self.__json__())
yield from project.open()
return project
return self._project
def __json__(self):
return {

View File

@ -319,7 +319,7 @@ class ProjectHandler:
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, 'wb+') as f:
while True:
packet = yield from request.content.read(512)
packet = yield from request.content.read(1024)
if not packet:
break
f.write(packet)
@ -380,7 +380,7 @@ class ProjectHandler:
try:
with tempfile.SpooledTemporaryFile(max_size=10000) as temp:
while True:
packet = yield from request.content.read(512)
packet = yield from request.content.read(1024)
if not packet:
break
temp.write(packet)

View File

@ -73,7 +73,7 @@ class LinkHandler:
node.get("adapter_number", 0),
node.get("port_number", 0),
label=node.get("label"))
except aiohttp.web_exceptions.HTTPException as e:
except aiohttp.web.HTTPException as e:
yield from project.delete_link(link.id)
raise e
response.set_status(201)

View File

@ -335,7 +335,7 @@ class ProjectHandler:
try:
with tempfile.SpooledTemporaryFile(max_size=10000) as temp:
while True:
packet = yield from request.content.read(512)
packet = yield from request.content.read(1024)
if not packet:
break
temp.write(packet)
@ -448,7 +448,7 @@ class ProjectHandler:
try:
with open(path, 'wb+') as f:
while True:
packet = yield from request.content.read(512)
packet = yield from request.content.read(1024)
if not packet:
break
f.write(packet)

View File

@ -133,7 +133,7 @@ class ServerHandler:
@Route.post(
r"/debug",
description="Dump debug informations to disk (debug directory in config directory). Work only for local server",
description="Dump debug information to disk (debug directory in config directory). Work only for local server",
status_codes={
201: "Writed"
})

View File

@ -66,7 +66,7 @@ class SymbolHandler:
try:
with open(path, 'wb') as f:
while True:
packet = yield from request.content.read(512)
packet = yield from request.content.read(1024)
if not packet:
break
f.write(packet)