diff --git a/gns3server/controller/compute.py b/gns3server/controller/compute.py index d93ac1f0..f739553f 100644 --- a/gns3server/controller/compute.py +++ b/gns3server/controller/compute.py @@ -150,9 +150,14 @@ class Compute: self._controller.save() async def close(self): + self._connected = False if self._http_session and not self._http_session.closed: await self._http_session.close() + try: + await self._notifications + except asyncio.CancelledError: + pass self._closed = True @property @@ -417,25 +422,36 @@ class Compute: Connect to the notification stream """ - async with self._session().ws_connect(self._getUrl("/notifications/ws"), auth=self._auth) as ws: - async for response in ws: - if response.type == aiohttp.WSMsgType.TEXT and response.data: - msg = json.loads(response.data) - action = msg.pop("action") - event = msg.pop("event") - project_id = msg.pop("project_id", None) - if action == "ping": - self._cpu_usage_percent = event["cpu_usage_percent"] - self._memory_usage_percent = event["memory_usage_percent"] - #FIXME: slow down number of compute events - self._controller.notification.controller_emit("compute.updated", self.__json__()) + ws_url = self._getUrl("/notifications/ws") + try: + async with self._session().ws_connect(ws_url, auth=self._auth, heartbeat=10) as ws: + log.info("Connected to compute WebSocket '{}'".format(ws_url)) + async for response in ws: + if response.type == aiohttp.WSMsgType.TEXT: + msg = json.loads(response.data) + action = msg.pop("action") + event = msg.pop("event") + project_id = msg.pop("project_id", None) + if action == "ping": + self._cpu_usage_percent = event["cpu_usage_percent"] + self._memory_usage_percent = event["memory_usage_percent"] + #FIXME: slow down number of compute events + self._controller.notification.controller_emit("compute.updated", self.__json__()) + else: + await self._controller.notification.dispatch(action, event, project_id=project_id, compute_id=self.id) else: - await self._controller.notification.dispatch(action, event, project_id=project_id, compute_id=self.id) - elif response.type == aiohttp.WSMsgType.CLOSED or response.type == aiohttp.WSMsgType.ERROR or response.data is None: - self._connected = False - break + if response.type == aiohttp.WSMsgType.CLOSE: + await ws.close() + elif response.type == aiohttp.WSMsgType.ERROR: + log.error("Error received on compute WebSocket '{}': {}".format(ws_url, ws.exception())) + elif response.type == aiohttp.WSMsgType.CLOSED: + pass + self._connected = False + break + finally: + log.info("Connection closed to compute WebSocket '{}'".format(ws_url)) - # Try to reconnect after 1 seconds if server unavailable only if not during tests (otherwise we create a ressources usage bomb) + # Try to reconnect after 1 second if server unavailable only if not during tests (otherwise we create a ressources usage bomb) if not hasattr(sys, "_called_from_test") or not sys._called_from_test: asyncio.get_event_loop().call_later(1, lambda: asyncio.ensure_future(self.connect())) diff --git a/gns3server/handlers/api/compute/notification_handler.py b/gns3server/handlers/api/compute/notification_handler.py index 75b6f8b1..194d1fe5 100644 --- a/gns3server/handlers/api/compute/notification_handler.py +++ b/gns3server/handlers/api/compute/notification_handler.py @@ -21,6 +21,9 @@ from aiohttp.web import WebSocketResponse from gns3server.web.route import Route from gns3server.compute.notification_manager import NotificationManager +import logging +log = logging.getLogger(__name__) + async def process_websocket(ws): """ @@ -44,7 +47,7 @@ class NotificationHandler: request.app['websockets'].add(ws) asyncio.ensure_future(process_websocket(ws)) - + log.info("New client has connected to compute WebSocket") try: with notifications.queue() as queue: while True: @@ -53,6 +56,7 @@ class NotificationHandler: break await ws.send_str(notification) finally: + log.info("Client has disconnected from compute WebSocket") if not ws.closed: await ws.close() request.app['websockets'].discard(ws) diff --git a/gns3server/handlers/api/controller/notification_handler.py b/gns3server/handlers/api/controller/notification_handler.py index 04c118e0..bf41ad20 100644 --- a/gns3server/handlers/api/controller/notification_handler.py +++ b/gns3server/handlers/api/controller/notification_handler.py @@ -21,6 +21,9 @@ from aiohttp.web import WebSocketResponse from gns3server.web.route import Route from gns3server.controller import Controller +import logging +log = logging.getLogger(__name__) + async def process_websocket(ws): """ @@ -67,6 +70,7 @@ class NotificationHandler: request.app['websockets'].add(ws) asyncio.ensure_future(process_websocket(ws)) + log.info("New client has connected to controller WebSocket") try: with controller.notification.controller_queue() as queue: while True: @@ -75,6 +79,7 @@ class NotificationHandler: break await ws.send_str(notification) finally: + log.info("Client has disconnected from controller WebSocket") if not ws.closed: await ws.close() request.app['websockets'].discard(ws)