The controller has a streaming API

Now we need to link the hypervisor to the controller.
This commit is contained in:
Julien Duponchelle 2016-03-17 17:32:37 +01:00
parent 76a0120d3e
commit de61ed316c
No known key found for this signature in database
GPG Key ID: F1E2485547D4595D
9 changed files with 255 additions and 53 deletions

View File

@ -18,9 +18,11 @@
import asyncio
import aiohttp
from uuid import UUID, uuid4
from contextlib import contextmanager
from .vm import VM
from .udp_link import UDPLink
from ..notification_queue import NotificationQueue
class Project:
@ -48,6 +50,7 @@ class Project:
self._hypervisors = set()
self._vms = {}
self._links = {}
self._listeners = set()
@property
def name(self):
@ -140,6 +143,29 @@ class Project:
for hypervisor in self._hypervisors:
yield from hypervisor.delete("/projects/{}".format(self._id))
@contextmanager
def queue(self):
"""
Get a queue of notifications
Use it with Python with
"""
queue = NotificationQueue()
self._listeners.add(queue)
yield queue
self._listeners.remove(queue)
def emit(self, action, event, **kwargs):
"""
Send an event to all the client listening for notifications
:param action: Action name
:param event: Event to send
:param kwargs: Add this meta to the notif (project_id for example)
"""
for listener in self._listeners:
listener.put_nowait((action, event, kwargs))
def __json__(self):
return {

View File

@ -51,6 +51,22 @@ class ProjectHandler:
response.set_status(201)
response.json(project)
@classmethod
@Route.get(
r"/projects/{project_id}",
description="Get the project",
parameters={
"project_id": "The UUID of the project",
},
status_codes={
200: "The project exist",
404: "The project doesn't exist"
})
def get(request, response):
controller = Controller.instance()
project = controller.getProject(request.match_info["project_id"])
response.json(project)
@classmethod
@Route.post(
r"/projects/{project_id}/commit",
@ -106,3 +122,62 @@ class ProjectHandler:
yield from project.delete()
controller.removeProject(project)
response.set_status(204)
@classmethod
@Route.get(
r"/projects/{project_id}/notifications",
description="Receive notifications about the projects",
parameters={
"project_id": "The UUID of the project",
},
status_codes={
200: "End of stream",
404: "The project doesn't exist"
})
def notification(request, response):
controller = Controller.instance()
project = controller.getProject(request.match_info["project_id"])
response.content_type = "application/json"
response.set_status(200)
response.enable_chunked_encoding()
# Very important: do not send a content lenght otherwise QT close the connection but curl can consume the Feed
response.content_length = None
response.start(request)
with project.queue() as queue:
while True:
try:
msg = yield from queue.get_json(5)
response.write(("{}\n".format(msg)).encode("utf-8"))
except asyncio.futures.CancelledError as e:
break
@classmethod
@Route.get(
r"/projects/{project_id}/notifications/ws",
description="Receive notifications about the projects via Websocket",
parameters={
"project_id": "The UUID of the project",
},
status_codes={
200: "End of stream",
404: "The project doesn't exist"
})
def notification_ws(request, response):
controller = Controller.instance()
project = controller.getProject(request.match_info["project_id"])
ws = aiohttp.web.WebSocketResponse()
yield from ws.prepare(request)
with project.queue() as queue:
while True:
try:
notif = yield from queue.get_json(5)
except asyncio.futures.CancelledError as e:
break
ws.send_str(notif)
return ws

View File

@ -35,6 +35,9 @@ class NotificationHandler:
with notifications.queue() as queue:
while True:
notif = yield from queue.get_json(5)
try:
notif = yield from queue.get_json(5)
except asyncio.futures.CancelledError as e:
break
ws.send_str(notif)
return ws

View File

@ -15,60 +15,10 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import asyncio
import psutil
import json
from contextlib import contextmanager
class NotificationQueue(asyncio.Queue):
"""
Queue returned by the notification manager.
"""
def __init__(self):
super().__init__()
self._first = True
@asyncio.coroutine
def get(self, timeout):
"""
When timeout is expire we send a ping notification with server informations
"""
# At first get we return a ping so the client receive immediately data
if self._first:
self._first = False
return ("ping", self._getPing(), {})
try:
(action, msg, kwargs) = yield from asyncio.wait_for(super().get(), timeout)
except asyncio.futures.TimeoutError:
return ("ping", self._getPing(), {})
return (action, msg, kwargs)
def _getPing(self):
"""
Return the content of the ping notification
"""
msg = {}
# Non blocking call in order to get cpu usage. First call will return 0
msg["cpu_usage_percent"] = psutil.cpu_percent(interval=None)
msg["memory_usage_percent"] = psutil.virtual_memory().percent
return msg
@asyncio.coroutine
def get_json(self, timeout):
"""
Get a message as a JSON
"""
(action, msg, kwargs) = yield from self.get(timeout)
if hasattr(msg, "__json__"):
msg = {"action": action, "event": msg.__json__()}
else:
msg = {"action": action, "event": msg}
msg.update(kwargs)
return json.dumps(msg, sort_keys=True)
from ..notification_queue import NotificationQueue
class NotificationManager:

View File

@ -0,0 +1,71 @@
#!/usr/bin/env python
#
# Copyright (C) 2016 GNS3 Technologies Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import asyncio
import psutil
import json
import psutil
class NotificationQueue(asyncio.Queue):
"""
Queue returned by the notification manager.
"""
def __init__(self):
super().__init__()
self._first = True
@asyncio.coroutine
def get(self, timeout):
"""
When timeout is expire we send a ping notification with server informations
"""
# At first get we return a ping so the client receive immediately data
if self._first:
self._first = False
return ("ping", self._getPing(), {})
try:
(action, msg, kwargs) = yield from asyncio.wait_for(super().get(), timeout)
except asyncio.futures.TimeoutError:
return ("ping", self._getPing(), {})
return (action, msg, kwargs)
def _getPing(self):
"""
Return the content of the ping notification
"""
msg = {}
# Non blocking call in order to get cpu usage. First call will return 0
msg["cpu_usage_percent"] = psutil.cpu_percent(interval=None)
msg["memory_usage_percent"] = psutil.virtual_memory().percent
return msg
@asyncio.coroutine
def get_json(self, timeout):
"""
Get a message as a JSON
"""
(action, msg, kwargs) = yield from self.get(timeout)
if hasattr(msg, "__json__"):
msg = {"action": action, "event": msg.__json__()}
else:
msg = {"action": action, "event": msg}
msg.update(kwargs)
return json.dumps(msg, sort_keys=True)

View File

@ -1,4 +1,5 @@
{% extends "layout.html" %}
{% block body %}
<h1>
Controller status

View File

@ -1,4 +1,20 @@
{% extends "layout.html" %}
{% block head %}
<script>
var socket = new WebSocket("ws://" + location.host + "/v2/projects/{{project.id}}/notifications/ws");
socket.onopen = function (event) {
document.getElementById("notifications").innerText = "Connected";
};
socket.onmessage = function (event) {
document.getElementById("notifications").innerText = event.data + "\n" + document.getElementById("notifications").innerText;
};
</script>
{% endblock %}
{% block body %}
<h1>
{{project.name}}
@ -34,6 +50,8 @@ in futur GNS3 versions.
{% endfor %}
</table>
<h2>Notifications</h2>
<div id="notifications">
</div>
{%endblock%}

View File

@ -99,3 +99,15 @@ def test_getLink(async_run):
with pytest.raises(aiohttp.web_exceptions.HTTPNotFound):
project.getLink("test")
def test_emit(async_run):
project = Project()
with project.queue() as queue:
assert len(project._listeners) == 1
async_run(queue.get(0.1)) #  ping
project.emit('test', {})
notif = async_run(queue.get(5))
assert notif == ('test', {}, {})
assert len(project._listeners) == 0

View File

@ -24,6 +24,7 @@ import os
import asyncio
import aiohttp
import pytest
import json
from unittest.mock import patch
@ -86,6 +87,12 @@ def test_commit_project_invalid_uuid(http_controller):
assert response.status == 404
def test_get_project(http_controller, project):
response = http_controller.get("/projects/{project_id}".format(project_id=project.id), example=True)
assert response.status == 200
assert response.json["name"] == "test"
def test_delete_project(http_controller, project):
with asyncio_patch("gns3server.controller.project.Project.delete", return_value=True) as mock:
response = http_controller.delete("/projects/{project_id}".format(project_id=project.id), example=True)
@ -105,3 +112,42 @@ def test_close_project(http_controller, project):
assert response.status == 204
assert mock.called
assert project not in Controller.instance().projects
def test_notification(http_controller, project, loop):
@asyncio.coroutine
def go(future):
response = yield from aiohttp.request("GET", http_controller.get_url("/projects/{project_id}/notifications".format(project_id=project.id)))
response.body = yield from response.content.read(200)
project.emit("vm.created", {"a": "b"})
response.body += yield from response.content.read(50)
response.close()
future.set_result(response)
future = asyncio.Future()
asyncio.async(go(future))
response = loop.run_until_complete(future)
assert response.status == 200
assert b'"action": "ping"' in response.body
assert b'"cpu_usage_percent"' in response.body
assert b'{"action": "vm.created", "event": {"a": "b"}}\n' in response.body
def test_notification_invalid_id(http_controller):
response = http_controller.get("/projects/{project_id}/notifications".format(project_id=uuid.uuid4()))
assert response.status == 404
def test_notification_ws(http_controller, project, async_run):
ws = http_controller.websocket("/projects/{project_id}/notifications/ws".format(project_id=project.id))
answer = async_run(ws.receive())
answer = json.loads(answer.data)
assert answer["action"] == "ping"
project.emit("test", {})
answer = async_run(ws.receive())
answer = json.loads(answer.data)
assert answer["action"] == "test"
async_run(http_controller.close())