A notification stream with process monitoring

This commit is contained in:
Julien Duponchelle 2015-03-04 16:01:56 +01:00
parent 57f5e7a7d9
commit e9ec5c8a37
15 changed files with 256 additions and 25 deletions

View File

@ -15,14 +15,23 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import asyncio
import json
from ...web.route import Route
from ...schemas.project import PROJECT_OBJECT_SCHEMA, PROJECT_CREATE_SCHEMA, PROJECT_UPDATE_SCHEMA
from ...modules.project_manager import ProjectManager
from ...modules import MODULES
import logging
log = logging.getLogger()
class ProjectHandler:
# How many clients has subcribe to notifications
_notifications_listening = 0
@classmethod
@Route.post(
r"/projects",
@ -123,8 +132,9 @@ class ProjectHandler:
pm = ProjectManager.instance()
project = pm.get_project(request.match_info["project_id"])
yield from project.close()
pm.remove_project(project.id)
if ProjectHandler._notifications_listening == 0:
yield from project.close()
pm.remove_project(project.id)
response.set_status(204)
@classmethod
@ -145,3 +155,45 @@ class ProjectHandler:
yield from project.delete()
pm.remove_project(project.id)
response.set_status(204)
@classmethod
@Route.get(
r"/projects/{project_id}/notifications",
description="Receive notifications about the projects",
parameters={
"project_id": "The UUID of the project",
},
status_codes={
200: "End of stream",
404: "The project doesn't exist"
})
def notification(request, response):
pm = ProjectManager.instance()
project = pm.get_project(request.match_info["project_id"])
response.content_type = "application/json"
response.set_status(200)
response.enable_chunked_encoding()
# Very important: do not send a content lenght otherwise QT close the connection but curl can consume the Feed
response.content_length = None
response.start(request)
queue = project.get_listen_queue()
ProjectHandler._notifications_listening += 1
response.write("{\"action\": \"ping\"}\n".encode("utf-8"))
while True:
try:
(action, msg) = yield from asyncio.wait_for(queue.get(), 5)
if hasattr(msg, "__json__"):
msg = json.dumps({"action": action, "event": msg.__json__()}, sort_keys=True)
else:
msg = json.dumps({"action": action, "event": msg}, sort_keys=True)
log.debug("Send notification: %s", msg)
response.write(("{}\n".format(msg)).encode("utf-8"))
except asyncio.futures.CancelledError as e:
break
except asyncio.futures.TimeoutError as e:
response.write("{\"action\": \"ping\"}\n".encode("utf-8"))
project.stop_listen_queue(queue)
ProjectHandler._notifications_listening -= 1

View File

@ -48,6 +48,7 @@ class BaseVM:
self._manager = manager
self._console = console
self._temporary_directory = None
self._vm_status = "stopped"
if self._console is not None:
self._console = self._manager.port_manager.reserve_tcp_port(self._console, self._project)
@ -66,6 +67,18 @@ class BaseVM:
if os.path.exists(self._temporary_directory):
shutil.rmtree(self._temporary_directory, ignore_errors=True)
@property
def status(self):
"""Return current VM status"""
return self._vm_status
@status.setter
def status(self, status):
self._vm_status = status
self._project.emit("vm.{}".format(status), self)
@property
def project(self):
"""

View File

@ -70,6 +70,16 @@ class Hypervisor(DynamipsHypervisor):
return self._id
@property
def process(self):
"""
Returns the subprocess of the Hypervisor
:returns: subprocess
"""
return self._process
@property
def started(self):
"""

View File

@ -35,7 +35,8 @@ from ...base_vm import BaseVM
from ..dynamips_error import DynamipsError
from ..nios.nio_udp import NIOUDP
from gns3server.utils.asyncio import wait_run_in_executor
from gns3server.config import Config
from gns3server.utils.asyncio import wait_run_in_executor, monitor_process
class Router(BaseVM):
@ -162,7 +163,7 @@ class Router(BaseVM):
slot_number += 1
# add the wics
if self._slots[0] and self._slots[0].wics:
if len(self._slots) > 0 and self._slots[0] and self._slots[0].wics:
for wic_slot_number in range(0, len(self._slots[0].wics)):
if self._slots[0].wics[wic_slot_number]:
router_info["wic" + str(wic_slot_number)] = str(self._slots[0].wics[wic_slot_number])
@ -251,7 +252,18 @@ class Router(BaseVM):
raise DynamipsError('"{}" is not a valid IOS image'.format(self._image))
yield from self._hypervisor.send('vm start "{name}"'.format(name=self._name))
self.status = "started"
log.info('router "{name}" [{id}] has been started'.format(name=self._name, id=self._id))
monitor_process(self._hypervisor.process, self._termination_callback)
@asyncio.coroutine
def _termination_callback(self, returncode):
"""
Called when the process is killed
:param returncode: Process returncode
"""
self.status = "stopped"
@asyncio.coroutine
def stop(self):
@ -262,6 +274,7 @@ class Router(BaseVM):
status = yield from self.get_status()
if status != "inactive":
yield from self._hypervisor.send('vm stop "{name}"'.format(name=self._name))
self.status = "stopped"
log.info('Router "{name}" [{id}] has been stopped'.format(name=self._name, id=self._id))
@asyncio.coroutine

View File

@ -465,6 +465,8 @@ class IOUVM(BaseVM):
env=env)
log.info("IOU instance {} started PID={}".format(self._id, self._iou_process.pid))
self._started = True
self.status = "started"
gns3server.utils.asyncio.monitor_process(self._iou_process, self._termination_callback)
except FileNotFoundError as e:
raise IOUError("Could not start IOU: {}: 32-bit binary support is probably not installed".format(e))
except (OSError, subprocess.SubprocessError) as e:
@ -477,6 +479,17 @@ class IOUVM(BaseVM):
# connections support
yield from self._start_iouyap()
def _termination_callback(self, returncode):
"""
Called when the process is killed
:param returncode: Process returncode
"""
log.info("IOU process crash return code: %d", returncode)
self._terminate_process_iou()
self._terminate_process_iouyap()
self._ioucon_thread_stop_event.set()
def _rename_nvram_file(self):
"""
Before starting the VM, rename the nvram and vlan.dat files with the correct IOU application identifier.
@ -507,6 +520,7 @@ class IOUVM(BaseVM):
stderr=subprocess.STDOUT,
cwd=self.working_dir)
gns3server.utils.asyncio.monitor_process(self._iouyap_process, self._termination_callback)
log.info("iouyap started PID={}".format(self._iouyap_process.pid))
except (OSError, subprocess.SubprocessError) as e:
iouyap_stdout = self.read_iouyap_stdout()
@ -615,24 +629,28 @@ class IOUVM(BaseVM):
Terminate the IOUYAP process if running.
"""
log.info('Stopping IOUYAP process for IOU VM "{}" PID={}'.format(self.name, self._iouyap_process.pid))
try:
self._iouyap_process.terminate()
# Sometime the process may already be dead when we garbage collect
except ProcessLookupError:
pass
if self._iouyap_process:
log.info('Stopping IOUYAP process for IOU VM "{}" PID={}'.format(self.name, self._iouyap_process.pid))
try:
self._iouyap_process.terminate()
# Sometime the process can already be dead when we garbage collect
except ProcessLookupError:
pass
def _terminate_process_iou(self):
"""
Terminate the IOU process if running
"""
log.info('Stopping IOU process for IOU VM "{}" PID={}'.format(self.name, self._iou_process.pid))
try:
self._iou_process.terminate()
# Sometime the process may already be dead when we garbage collect
except ProcessLookupError:
pass
if self._iou_process:
log.info('Stopping IOU process for IOU VM "{}" PID={}'.format(self.name, self._iou_process.pid))
try:
self._iou_process.terminate()
# Sometime the process can already be dead when we garbage collect
except ProcessLookupError:
pass
self._started = False
self.status = "stopped"
@asyncio.coroutine
def reload(self):

View File

@ -541,7 +541,6 @@ def send_recv_loop(epoll, console, router, esc_char, stop_event):
else:
router.write(buf)
finally:
log.debug("Finally")
router.unregister(epoll)
console.unregister(epoll)

View File

@ -65,6 +65,9 @@ class Project:
self._used_tcp_ports = set()
self._used_udp_ports = set()
# List of clients listen for notifications
self._listeners = set()
if path is None:
path = os.path.join(self._location, self._id)
try:
@ -416,3 +419,26 @@ class Project:
# We import it at the last time to avoid circular dependencies
from ..modules import MODULES
return MODULES
def emit(self, action, event):
"""
Send an event to all the client listens for notifications
:param action: Action happened
:param event: Event sended to the client
"""
for listener in self._listeners:
listener.put_nowait((action, event, ))
def get_listen_queue(self):
"""Get a queue where you receive all the events related to the
project."""
queue = asyncio.Queue()
self._listeners.add(queue)
return queue
def stop_listen_queue(self, queue):
"""Stop sending notification to this clients"""
self._listeners.remove(queue)

View File

@ -35,6 +35,8 @@ from ..nios.nio_udp import NIOUDP
from ..nios.nio_tap import NIOTAP
from ..base_vm import BaseVM
from ...schemas.qemu import QEMU_OBJECT_SCHEMA
from ...utils.asyncio import monitor_process
from ...config import Config
import logging
log = logging.getLogger(__name__)
@ -62,7 +64,6 @@ class QemuVM(BaseVM):
self._host = server_config.get("host", "127.0.0.1")
self._monitor_host = server_config.get("monitor_host", "127.0.0.1")
self._command = []
self._started = False
self._process = None
self._cpulimit_process = None
self._monitor = None
@ -581,7 +582,9 @@ class QemuVM(BaseVM):
stderr=subprocess.STDOUT,
cwd=self.working_dir)
log.info('QEMU VM "{}" started PID={}'.format(self._name, self._process.pid))
self._started = True
self.status = "started"
monitor_process(self._process, self._termination_callback)
except (OSError, subprocess.SubprocessError) as e:
stdout = self.read_stdout()
log.error("Could not start QEMU {}: {}\n{}".format(self.qemu_path, e, stdout))
@ -591,6 +594,17 @@ class QemuVM(BaseVM):
if self._cpu_throttling:
self._set_cpu_throttling()
def _termination_callback(self, returncode):
"""
Called when the process is killed
:param returncode: Process returncode
"""
if self.started:
log.info("Process Qemu is dead. Return code: %d", returncode)
self.status = "stopped"
self._process = None
@asyncio.coroutine
def stop(self):
"""
@ -608,7 +622,7 @@ class QemuVM(BaseVM):
if self._process.returncode is None:
log.warn('QEMU VM "{}" PID={} is still running'.format(self._name, self._process.pid))
self._process = None
self._started = False
self.status = "stopped"
self._stop_cpulimit()
@asyncio.coroutine
@ -807,7 +821,7 @@ class QemuVM(BaseVM):
:returns: boolean
"""
return self._started
return self.status == "started"
def read_stdout(self):
"""

View File

@ -35,7 +35,7 @@ from ..adapters.ethernet_adapter import EthernetAdapter
from ..nios.nio_udp import NIOUDP
from ..nios.nio_tap import NIOTAP
from ..base_vm import BaseVM
from ...utils.asyncio import subprocess_check_output
from ...utils.asyncio import subprocess_check_output, monitor_process
import logging
@ -109,6 +109,7 @@ class VPCSVM(BaseVM):
return {"name": self.name,
"vm_id": self.id,
"status": self.status,
"console": self._console,
"project_id": self.project.id,
"startup_script": self.startup_script,
@ -233,13 +234,27 @@ class VPCSVM(BaseVM):
stderr=subprocess.STDOUT,
cwd=self.working_dir,
creationflags=flags)
monitor_process(self._process, self._termination_callback)
log.info("VPCS instance {} started PID={}".format(self.name, self._process.pid))
self._started = True
self.status = "started"
except (OSError, subprocess.SubprocessError) as e:
vpcs_stdout = self.read_vpcs_stdout()
log.error("Could not start VPCS {}: {}\n{}".format(self.vpcs_path, e, vpcs_stdout))
raise VPCSError("Could not start VPCS {}: {}\n{}".format(self.vpcs_path, e, vpcs_stdout))
def _termination_callback(self, returncode):
"""
Called when the process is killed
:param returncode: Process returncode
"""
if self._started:
log.info("Process VPCS is dead. Return code: %d", returncode)
self._started = False
self.status = "stopped"
self._process = None
@asyncio.coroutine
def stop(self):
"""
@ -258,6 +273,7 @@ class VPCSVM(BaseVM):
self._process = None
self._started = False
self.status = "stopped"
@asyncio.coroutine
def reload(self):

View File

@ -92,6 +92,10 @@ VPCS_OBJECT_SCHEMA = {
"maxLength": 36,
"pattern": "^[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}$"
},
"status": {
"description": "VM status",
"enum": ["started", "stopped"]
},
"console": {
"description": "console TCP port",
"minimum": 1,
@ -115,5 +119,5 @@ VPCS_OBJECT_SCHEMA = {
},
},
"additionalProperties": False,
"required": ["name", "vm_id", "console", "project_id", "startup_script_path"]
"required": ["name", "vm_id", "status", "console", "project_id", "startup_script_path"]
}

View File

@ -17,6 +17,8 @@
import asyncio
import shutil
import sys
@asyncio.coroutine
@ -76,3 +78,19 @@ def wait_for_process_termination(process, timeout=10):
yield from asyncio.sleep(0.1)
timeout -= 0.1
raise asyncio.TimeoutError()
@asyncio.coroutine
def _check_process(process, termination_callback):
if not hasattr(sys, "_called_from_test") or not sys._called_from_test:
returncode = yield from process.wait()
if asyncio.iscoroutinefunction(termination_callback):
yield from termination_callback(returncode)
else:
termination_callback(returncode)
def monitor_process(process, termination_callback):
"""Call termination_callback when process die"""
asyncio.async(_check_process(process, termination_callback))

View File

@ -20,9 +20,13 @@ This test suite check /project endpoint
"""
import uuid
import asyncio
import aiohttp
from unittest.mock import patch
from tests.utils import asyncio_patch
from gns3server.handlers.api.project_handler import ProjectHandler
def test_create_project_with_path(server, tmpdir):
with patch("gns3server.modules.project.Project.is_local", return_value=True):
@ -139,6 +143,38 @@ def test_close_project(server, project):
assert mock.called
def test_close_project_two_client_connected(server, project):
ProjectHandler._notifications_listening = 2
with asyncio_patch("gns3server.modules.project.Project.close", return_value=True) as mock:
response = server.post("/projects/{project_id}/close".format(project_id=project.id), example=True)
assert response.status == 204
assert not mock.called
def test_close_project_invalid_uuid(server):
response = server.post("/projects/{project_id}/close".format(project_id=uuid.uuid4()))
assert response.status == 404
def test_notification(server, project, loop):
@asyncio.coroutine
def go(future):
response = yield from aiohttp.request("GET", server.get_url("/projects/{project_id}/notifications".format(project_id=project.id), 1))
response.body = yield from response.content.read(19)
project.emit("vm.created", {"a": "b"})
response.body += yield from response.content.read(47)
response.close()
future.set_result(response)
future = asyncio.Future()
asyncio.async(go(future))
response = loop.run_until_complete(future)
assert response.status == 200
assert response.body == b'{"action": "ping"}\n{"action": "vm.created", "event": {"a": "b"}}\n'
def test_notification_invalid_id(server, project):
response = server.get("/projects/{project_id}/notifications".format(project_id=uuid.uuid4()))
assert response.status == 404

View File

@ -42,7 +42,8 @@ def test_vpcs_get(server, project, vm):
assert response.route == "/projects/{project_id}/vpcs/vms/{vm_id}"
assert response.json["name"] == "PC TEST 1"
assert response.json["project_id"] == project.id
assert response.json["startup_script_path"] == None
assert response.json["startup_script_path"] is None
assert response.json["status"] == "stopped"
def test_vpcs_create_startup_script(server, project):
@ -95,6 +96,7 @@ def test_vpcs_delete_nio(server, vm):
def test_vpcs_start(server, vm):
with asyncio_patch("gns3server.modules.vpcs.vpcs_vm.VPCSVM.start", return_value=True) as mock:
response = server.post("/projects/{project_id}/vpcs/vms/{vm_id}/start".format(project_id=vm["project_id"], vm_id=vm["vm_id"]), example=True)
assert mock.called

View File

@ -72,6 +72,7 @@ def test_vm_invalid_vpcs_path(project, manager, loop):
def test_start(loop, vm):
process = MagicMock()
process.returncode = None
queue = vm.project.get_listen_queue()
with asyncio_patch("gns3server.modules.vpcs.vpcs_vm.VPCSVM._check_requirements", return_value=True):
with asyncio_patch("asyncio.create_subprocess_exec", return_value=process):
@ -79,6 +80,9 @@ def test_start(loop, vm):
vm.port_add_nio_binding(0, nio)
loop.run_until_complete(asyncio.async(vm.start()))
assert vm.is_running()
(action, event) = queue.get_nowait()
assert action == "vm.started"
assert event == vm
def test_stop(loop, vm):
@ -98,6 +102,8 @@ def test_stop(loop, vm):
loop.run_until_complete(asyncio.async(vm.start()))
assert vm.is_running()
queue = vm.project.get_listen_queue()
with asyncio_patch("gns3server.utils.asyncio.wait_for_process_termination"):
loop.run_until_complete(asyncio.async(vm.stop()))
assert vm.is_running() is False
@ -107,6 +113,10 @@ def test_stop(loop, vm):
else:
process.terminate.assert_called_with()
(action, event) = queue.get_nowait()
assert action == "vm.stopped"
assert event == vm
def test_reload(loop, vm):
process = MagicMock()

View File

@ -10,4 +10,4 @@ ignore = E501,E402
[pytest]
norecursedirs = old_tests .tox
timeout = 2
timeout = 5