Merge pull request #1275 from GNS3/async-md5-calcs

Calculate MD5 on thread and before json response, Ref. gui#2239
This commit is contained in:
Jeremy Grossmann 2018-01-29 23:28:11 +07:00 committed by GitHub
commit 62e03148c1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 79 additions and 3 deletions

View File

@ -20,12 +20,17 @@ import os
import struct import struct
import stat import stat
import asyncio import asyncio
from asyncio.futures import CancelledError
import aiohttp import aiohttp
import socket import socket
import shutil import shutil
import re import re
import logging import logging
from gns3server.utils.asyncio import cancellable_wait_run_in_executor
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
from uuid import UUID, uuid4 from uuid import UUID, uuid4
@ -558,7 +563,7 @@ class BaseManager:
f.write(packet) f.write(packet)
os.chmod(tmp_path, stat.S_IWRITE | stat.S_IREAD | stat.S_IEXEC) os.chmod(tmp_path, stat.S_IWRITE | stat.S_IREAD | stat.S_IEXEC)
shutil.move(tmp_path, path) shutil.move(tmp_path, path)
md5sum(path) yield from cancellable_wait_run_in_executor(md5sum, path)
except OSError as e: except OSError as e:
raise aiohttp.web.HTTPConflict(text="Could not write image: {} because {}".format(filename, e)) raise aiohttp.web.HTTPConflict(text="Could not write image: {} because {}".format(filename, e))

View File

@ -32,7 +32,7 @@ import gns3server
import subprocess import subprocess
from gns3server.utils import parse_version from gns3server.utils import parse_version
from gns3server.utils.asyncio import subprocess_check_output from gns3server.utils.asyncio import subprocess_check_output, cancellable_wait_run_in_executor
from .qemu_error import QemuError from .qemu_error import QemuError
from ..adapters.ethernet_adapter import EthernetAdapter from ..adapters.ethernet_adapter import EthernetAdapter
from ..nios.nio_udp import NIOUDP from ..nios.nio_udp import NIOUDP
@ -873,6 +873,22 @@ class QemuVM(BaseNode):
except (OSError, subprocess.SubprocessError) as e: except (OSError, subprocess.SubprocessError) as e:
raise QemuError("Could not throttle CPU: {}".format(e)) raise QemuError("Could not throttle CPU: {}".format(e))
@asyncio.coroutine
def create(self):
"""
Creates QEMU VM and sets proper MD5 hashes
"""
# In case user upload image manually we don't have md5 sums.
# We need generate hashes at this point, otherwise they will be generated
# at __json__ but not on separate thread.
yield from cancellable_wait_run_in_executor(md5sum, self._hda_disk_image)
yield from cancellable_wait_run_in_executor(md5sum, self._hdb_disk_image)
yield from cancellable_wait_run_in_executor(md5sum, self._hdc_disk_image)
yield from cancellable_wait_run_in_executor(md5sum, self._hdd_disk_image)
super(QemuVM, self).create()
@asyncio.coroutine @asyncio.coroutine
def start(self): def start(self):
""" """

View File

@ -20,6 +20,9 @@ import functools
import asyncio import asyncio
import sys import sys
import os import os
import threading
from asyncio.futures import CancelledError
@asyncio.coroutine @asyncio.coroutine
@ -40,6 +43,25 @@ def wait_run_in_executor(func, *args, **kwargs):
return future.result() return future.result()
@asyncio.coroutine
def cancellable_wait_run_in_executor(func, *args, **kwargs):
"""
Run blocking code in a different thread and wait
for the result. Support cancellation.
:param func: Run this function in a different thread
:param args: Parameters of the function
:param kwargs: Keyword parameters of the function
:returns: Return the result of the function
"""
stopped_event = threading.Event()
kwargs['stopped_event'] = stopped_event
try:
yield from wait_run_in_executor(func, *args, **kwargs)
except CancelledError:
stopped_event.set()
@asyncio.coroutine @asyncio.coroutine
def subprocess_check_output(*args, cwd=None, env=None, stderr=False): def subprocess_check_output(*args, cwd=None, env=None, stderr=False):
""" """

View File

@ -143,11 +143,13 @@ def images_directories(type):
return [force_unix_path(p) for p in paths if os.path.exists(p)] return [force_unix_path(p) for p in paths if os.path.exists(p)]
def md5sum(path): def md5sum(path, stopped_event=None):
""" """
Return the md5sum of an image and cache it on disk Return the md5sum of an image and cache it on disk
:param path: Path to the image :param path: Path to the image
:param stopped_event: In case you execute this function on thread and would like to have possibility
to cancel operation pass the `threading.Event`
:returns: Digest of the image :returns: Digest of the image
""" """
@ -167,6 +169,9 @@ def md5sum(path):
m = hashlib.md5() m = hashlib.md5()
with open(path, 'rb') as f: with open(path, 'rb') as f:
while True: while True:
if stopped_event is not None and stopped_event.is_set():
log.error("MD5 sum calculation of `{}` has stopped due to cancellation".format(path))
return
buf = f.read(128) buf = f.read(128)
if not buf: if not buf:
break break

View File

@ -89,6 +89,21 @@ def test_vm(project, manager, fake_qemu_binary):
assert vm.id == "00010203-0405-0607-0809-0a0b0c0d0e0f" assert vm.id == "00010203-0405-0607-0809-0a0b0c0d0e0f"
def test_vm_create(loop, tmpdir, project, manager, fake_qemu_binary):
fake_img = str(tmpdir / 'hello')
with open(fake_img, 'w+') as f:
f.write('hello')
vm = QemuVM("test", "00010203-0405-0607-0809-0a0b0c0d0e0f", project, manager, qemu_path=fake_qemu_binary)
vm._hda_disk_image = fake_img
loop.run_until_complete(asyncio.ensure_future(vm.create()))
# tests if `create` created md5sums
assert os.path.exists(str(tmpdir / 'hello.md5sum'))
def test_vm_invalid_qemu_with_platform(project, manager, fake_qemu_binary): def test_vm_invalid_qemu_with_platform(project, manager, fake_qemu_binary):
vm = QemuVM("test", "00010203-0405-0607-0809-0a0b0c0d0e0f", project, manager, qemu_path="/usr/fake/bin/qemu-system-64", platform="x86_64") vm = QemuVM("test", "00010203-0405-0607-0809-0a0b0c0d0e0f", project, manager, qemu_path="/usr/fake/bin/qemu-system-64", platform="x86_64")

View File

@ -17,6 +17,7 @@
import os import os
import sys import sys
import threading
from unittest.mock import patch from unittest.mock import patch
@ -57,6 +58,18 @@ def test_md5sum(tmpdir):
assert f.read() == '5d41402abc4b2a76b9719d911017c592' assert f.read() == '5d41402abc4b2a76b9719d911017c592'
def test_md5sum_stopped_event(tmpdir):
fake_img = str(tmpdir / 'hello_stopped')
with open(fake_img, 'w+') as f:
f.write('hello')
event = threading.Event()
event.set()
assert md5sum(fake_img, stopped_event=event) is None
assert not os.path.exists(str(tmpdir / 'hello_stopped.md5sum'))
def test_md5sum_existing_digest(tmpdir): def test_md5sum_existing_digest(tmpdir):
fake_img = str(tmpdir / 'hello') fake_img = str(tmpdir / 'hello')