Notif forwarded from hypervisor to controller

This commit is contained in:
Julien Duponchelle 2016-03-18 16:55:54 +01:00
parent de61ed316c
commit bc14d5d78e
No known key found for this signature in database
GPG Key ID: F1E2485547D4595D
8 changed files with 174 additions and 46 deletions

View File

@ -209,7 +209,9 @@ The available notification are:
* vm.created * vm.created
* vm.started * vm.started
* vm.stopped * vm.stopped
* vm.deleted
* log.error * log.error
* log.warning
Previous versions Previous versions
================= =================

View File

@ -45,7 +45,7 @@ class Controller:
:param kwargs: See the documentation of Hypervisor :param kwargs: See the documentation of Hypervisor
""" """
if hypervisor_id not in self._hypervisors: if hypervisor_id not in self._hypervisors:
hypervisor = Hypervisor(hypervisor_id=hypervisor_id, **kwargs) hypervisor = Hypervisor(hypervisor_id=hypervisor_id, controller=self, **kwargs)
self._hypervisors[hypervisor_id] = hypervisor self._hypervisors[hypervisor_id] = hypervisor
return self._hypervisors[hypervisor_id] return self._hypervisors[hypervisor_id]
@ -109,3 +109,18 @@ class Controller:
if not hasattr(Controller, '_instance') or Controller._instance is None: if not hasattr(Controller, '_instance') or Controller._instance is None:
Controller._instance = Controller() Controller._instance = Controller()
return Controller._instance return Controller._instance
def emit(self, action, event, **kwargs):
"""
Send a notification to clients scoped by projects
"""
if "project_id" in kwargs:
try:
project_id = kwargs.pop("project_id")
self._projects[project_id].emit(action, event, **kwargs)
except KeyError:
pass
else:
for project in self._projects.values():
project.emit(action, event, **kwargs)

View File

@ -38,7 +38,8 @@ class Hypervisor:
A GNS3 hypervisor. A GNS3 hypervisor.
""" """
def __init__(self, hypervisor_id, protocol="http", host="localhost", port=8000, user=None, password=None): def __init__(self, hypervisor_id, controller=None, protocol="http", host="localhost", port=8000, user=None, password=None):
assert controller is not None
log.info("Create hypervisor %s", hypervisor_id) log.info("Create hypervisor %s", hypervisor_id)
self._id = hypervisor_id self._id = hypervisor_id
self._protocol = protocol self._protocol = protocol
@ -48,15 +49,19 @@ class Hypervisor:
self._password = None self._password = None
self._setAuth(user, password) self._setAuth(user, password)
self._connected = False self._connected = False
# The remote hypervisor version self._controller = controller
# TODO: For the moment it's fake we return the controller version self._session = aiohttp.ClientSession()
self._version = __version__
# If the hypervisor is local but the hypervisor id is local # If the hypervisor is local but the hypervisor id is local
# it's a configuration issue # it's a configuration issue
if hypervisor_id == "local" and Config.instance().get_section_config("Server")["local"] is False: if hypervisor_id == "local" and Config.instance().get_section_config("Server")["local"] is False:
raise HypervisorError("The local hypervisor is started without --local") raise HypervisorError("The local hypervisor is started without --local")
asyncio.async(self._connect())
def __del__(self):
self._session.close()
def _setAuth(self, user, password): def _setAuth(self, user, password):
""" """
Set authentication parameters Set authentication parameters
@ -110,6 +115,15 @@ class Hypervisor:
@asyncio.coroutine @asyncio.coroutine
def httpQuery(self, method, path, data=None): def httpQuery(self, method, path, data=None):
if not self._connected:
yield from self._connect()
return (yield from self._runHttpQuery(method, path, data=data))
@asyncio.coroutine
def _connect(self):
"""
Check if remote server is accessible
"""
if not self._connected: if not self._connected:
response = yield from self._runHttpQuery("GET", "/version") response = yield from self._runHttpQuery("GET", "/version")
if "version" not in response.json: if "version" not in response.json:
@ -117,20 +131,39 @@ class Hypervisor:
if parse_version(__version__)[:2] != parse_version(response.json["version"])[:2]: if parse_version(__version__)[:2] != parse_version(response.json["version"])[:2]:
raise aiohttp.web.HTTPConflict(text="The server {} versions are not compatible {} != {}".format(self._id, __version__, response.json["version"])) raise aiohttp.web.HTTPConflict(text="The server {} versions are not compatible {} != {}".format(self._id, __version__, response.json["version"]))
self._notifications = asyncio.async(self._connectNotification())
self._connected = True self._connected = True
return (yield from self._runHttpQuery(method, path, data=data))
@asyncio.coroutine
def _connectNotification(self):
"""
Connect to the notification stream
"""
ws = yield from self._session.ws_connect(self._getUrl("/notifications/ws"), auth=self._auth)
while True:
response = yield from ws.receive()
if response.tp == aiohttp.MsgType.closed or response.tp == aiohttp.MsgType.error:
self._connected = False
break
msg = json.loads(response.data)
action = msg.pop("action")
event = msg.pop("event")
self._controller.emit(action, event, hypervisor_id=self.id, **msg)
def _getUrl(self, path):
return "{}://{}:{}/v2/hypervisor{}".format(self._protocol, self._host, self._port, path)
@asyncio.coroutine @asyncio.coroutine
def _runHttpQuery(self, method, path, data=None): def _runHttpQuery(self, method, path, data=None):
with aiohttp.Timeout(10): with aiohttp.Timeout(10):
with aiohttp.ClientSession() as session: url = self._getUrl(path)
url = "{}://{}:{}/v2/hypervisor{}".format(self._protocol, self._host, self._port, path)
headers = {'content-type': 'application/json'} headers = {'content-type': 'application/json'}
if data: if data:
if hasattr(data, '__json__'): if hasattr(data, '__json__'):
data = data.__json__() data = data.__json__()
data = json.dumps(data) data = json.dumps(data)
response = yield from session.request(method, url, headers=headers, data=data, auth=self._auth) response = yield from self._session.request(method, url, headers=headers, data=data, auth=self._auth)
body = yield from response.read() body = yield from response.read()
if body: if body:
body = body.decode() body = body.decode()
@ -147,7 +180,7 @@ class Hypervisor:
elif response.status == 409: elif response.status == 409:
raise aiohttp.web.HTTPConflict(text="Conflict {} {}".format(url, body)) raise aiohttp.web.HTTPConflict(text="Conflict {} {}".format(url, body))
else: else:
raise NotImplemented("{} status code is not supported".format(e.status)) raise NotImplementedError("{} status code is not supported".format(response.status))
if body and len(body): if body and len(body):
try: try:
response.json = json.loads(body) response.json = json.loads(body)

View File

@ -42,14 +42,14 @@ class Link:
""" """
Create the link Create the link
""" """
raise NotImplemented raise NotImplementedError
@asyncio.coroutine @asyncio.coroutine
def delete(self): def delete(self):
""" """
Delete the link Delete the link
""" """
raise NotImplemented raise NotImplementedError
@property @property
def id(self): def id(self):

View File

@ -218,6 +218,7 @@ class BaseVM:
log.info("{module}: {name} [{id}] created".format(module=self.manager.module_name, log.info("{module}: {name} [{id}] created".format(module=self.manager.module_name,
name=self.name, name=self.name,
id=self.id)) id=self.id))
self._project.emit("vm.created", self)
@asyncio.coroutine @asyncio.coroutine
def delete(self): def delete(self):
@ -225,6 +226,7 @@ class BaseVM:
Delete the VM (including all its files). Delete the VM (including all its files).
""" """
self._project.emit("vm.deleted", self)
directory = self.project.vm_working_directory(self) directory = self.project.vm_working_directory(self)
if os.path.exists(directory): if os.path.exists(directory):
try: try:

View File

@ -77,7 +77,7 @@ def test_removeProject(controller, async_run):
def test_addProject_with_hypervisor(controller, async_run): def test_addProject_with_hypervisor(controller, async_run):
uuid1 = str(uuid.uuid4()) uuid1 = str(uuid.uuid4())
hypervisor = Hypervisor("test1") hypervisor = Hypervisor("test1", controller=MagicMock())
hypervisor.post = MagicMock() hypervisor.post = MagicMock()
controller._hypervisors = {"test1": hypervisor} controller._hypervisors = {"test1": hypervisor}
@ -92,3 +92,48 @@ def test_getProject(controller, async_run):
assert controller.getProject(uuid1) == project assert controller.getProject(uuid1) == project
with pytest.raises(aiohttp.web.HTTPNotFound): with pytest.raises(aiohttp.web.HTTPNotFound):
assert controller.getProject("dsdssd") assert controller.getProject("dsdssd")
def test_emit(controller, async_run):
project1 = MagicMock()
uuid1 = str(uuid.uuid4())
controller._projects[uuid1] = project1
project2 = MagicMock()
uuid2 = str(uuid.uuid4())
controller._projects[uuid2] = project2
# Notif without project should be send to all projects
controller.emit("test", {})
assert project1.emit.called
assert project2.emit.called
def test_emit_to_project(controller, async_run):
project1 = MagicMock()
uuid1 = str(uuid.uuid4())
controller._projects[uuid1] = project1
project2 = MagicMock()
uuid2 = str(uuid.uuid4())
controller._projects[uuid2] = project2
# Notif with project should be send to this project
controller.emit("test", {}, project_id=uuid1)
project1.emit.assert_called_with('test', {})
assert not project2.emit.called
def test_emit_to_project_not_exists(controller, async_run):
project1 = MagicMock()
uuid1 = str(uuid.uuid4())
controller._projects[uuid1] = project1
project2 = MagicMock()
uuid2 = str(uuid.uuid4())
controller._projects[uuid2] = project2
# Notif with project should be send to this project
controller.emit("test", {}, project_id="4444444")
assert not project1.emit.called
assert not project2.emit.called

View File

@ -19,6 +19,7 @@
import pytest import pytest
import json import json
import aiohttp import aiohttp
import asyncio
from unittest.mock import patch, MagicMock from unittest.mock import patch, MagicMock
from gns3server.controller.project import Project from gns3server.controller.project import Project
@ -29,7 +30,7 @@ from tests.utils import asyncio_patch, AsyncioMagicMock
@pytest.fixture @pytest.fixture
def hypervisor(): def hypervisor():
hypervisor = Hypervisor("my_hypervisor_id", protocol="https", host="example.com", port=84) hypervisor = Hypervisor("my_hypervisor_id", protocol="https", host="example.com", port=84, controller=MagicMock())
hypervisor._connected = True hypervisor._connected = True
return hypervisor return hypervisor
@ -46,10 +47,10 @@ def test_hypervisor_local(hypervisor):
with patch("gns3server.config.Config.get_section_config", return_value={"local": False}): with patch("gns3server.config.Config.get_section_config", return_value={"local": False}):
with pytest.raises(HypervisorError): with pytest.raises(HypervisorError):
s = Hypervisor("local") s = Hypervisor("local", controller=MagicMock())
with patch("gns3server.config.Config.get_section_config", return_value={"local": True}): with patch("gns3server.config.Config.get_section_config", return_value={"local": True}):
s = Hypervisor("test") s = Hypervisor("test", controller=MagicMock())
def test_hypervisor_httpQuery(hypervisor, async_run): def test_hypervisor_httpQuery(hypervisor, async_run):
@ -139,6 +140,35 @@ def test_hypervisor_httpQuery_project(hypervisor, async_run):
mock.assert_called_with("POST", "https://example.com:84/v2/hypervisor/projects", data=json.dumps(project.__json__()), headers={'content-type': 'application/json'}, auth=None) mock.assert_called_with("POST", "https://example.com:84/v2/hypervisor/projects", data=json.dumps(project.__json__()), headers={'content-type': 'application/json'}, auth=None)
def test_connectNotification(hypervisor, async_run):
ws_mock = AsyncioMagicMock()
call = 0
@asyncio.coroutine
def receive():
nonlocal call
call += 1
if call == 1:
response = MagicMock()
response.data = '{"action": "test", "event": {"a": 1}, "project_id": "42"}'
response.tp = aiohttp.MsgType.text
return response
else:
response = MagicMock()
response.tp = aiohttp.MsgType.closed
return response
hypervisor._controller = MagicMock()
hypervisor._session = AsyncioMagicMock(return_value=ws_mock)
hypervisor._session.ws_connect = AsyncioMagicMock(return_value=ws_mock)
ws_mock.receive = receive
async_run(hypervisor._connectNotification())
hypervisor._controller.emit.assert_called_with('test', {'a': 1}, hypervisor_id=hypervisor.id, project_id='42')
assert hypervisor._connected is False
def test_json(hypervisor): def test_json(hypervisor):
hypervisor.user = "test" hypervisor.user = "test"
assert hypervisor.__json__() == { assert hypervisor.__json__() == {

View File

@ -16,6 +16,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import pytest import pytest
from unittest.mock import MagicMock
from gns3server.controller.link import Link from gns3server.controller.link import Link
from gns3server.controller.vm import VM from gns3server.controller.vm import VM
@ -30,7 +31,7 @@ def project():
@pytest.fixture @pytest.fixture
def hypervisor(): def hypervisor():
return Hypervisor("example.com") return Hypervisor("example.com", controller=MagicMock())
def test_addVM(async_run, project, hypervisor): def test_addVM(async_run, project, hypervisor):