Some adjustments with compute WebSocket handling. Ref https://github.com/GNS3/gns3-server/issues/1564

This commit is contained in:
grossmj 2019-04-14 16:48:12 +07:00
parent 6db8cecda5
commit 6dc58b28fd
3 changed files with 43 additions and 18 deletions

View File

@ -150,9 +150,14 @@ class Compute:
self._controller.save() self._controller.save()
async def close(self): async def close(self):
self._connected = False self._connected = False
if self._http_session and not self._http_session.closed: if self._http_session and not self._http_session.closed:
await self._http_session.close() await self._http_session.close()
try:
await self._notifications
except asyncio.CancelledError:
pass
self._closed = True self._closed = True
@property @property
@ -417,25 +422,36 @@ class Compute:
Connect to the notification stream Connect to the notification stream
""" """
async with self._session().ws_connect(self._getUrl("/notifications/ws"), auth=self._auth) as ws: ws_url = self._getUrl("/notifications/ws")
async for response in ws: try:
if response.type == aiohttp.WSMsgType.TEXT and response.data: async with self._session().ws_connect(ws_url, auth=self._auth, heartbeat=10) as ws:
msg = json.loads(response.data) log.info("Connected to compute WebSocket '{}'".format(ws_url))
action = msg.pop("action") async for response in ws:
event = msg.pop("event") if response.type == aiohttp.WSMsgType.TEXT:
project_id = msg.pop("project_id", None) msg = json.loads(response.data)
if action == "ping": action = msg.pop("action")
self._cpu_usage_percent = event["cpu_usage_percent"] event = msg.pop("event")
self._memory_usage_percent = event["memory_usage_percent"] project_id = msg.pop("project_id", None)
#FIXME: slow down number of compute events if action == "ping":
self._controller.notification.controller_emit("compute.updated", self.__json__()) self._cpu_usage_percent = event["cpu_usage_percent"]
self._memory_usage_percent = event["memory_usage_percent"]
#FIXME: slow down number of compute events
self._controller.notification.controller_emit("compute.updated", self.__json__())
else:
await self._controller.notification.dispatch(action, event, project_id=project_id, compute_id=self.id)
else: else:
await self._controller.notification.dispatch(action, event, project_id=project_id, compute_id=self.id) if response.type == aiohttp.WSMsgType.CLOSE:
elif response.type == aiohttp.WSMsgType.CLOSED or response.type == aiohttp.WSMsgType.ERROR or response.data is None: await ws.close()
self._connected = False elif response.type == aiohttp.WSMsgType.ERROR:
break log.error("Error received on compute WebSocket '{}': {}".format(ws_url, ws.exception()))
elif response.type == aiohttp.WSMsgType.CLOSED:
pass
self._connected = False
break
finally:
log.info("Connection closed to compute WebSocket '{}'".format(ws_url))
# Try to reconnect after 1 seconds if server unavailable only if not during tests (otherwise we create a ressources usage bomb) # Try to reconnect after 1 second if server unavailable only if not during tests (otherwise we create a ressources usage bomb)
if not hasattr(sys, "_called_from_test") or not sys._called_from_test: if not hasattr(sys, "_called_from_test") or not sys._called_from_test:
asyncio.get_event_loop().call_later(1, lambda: asyncio.ensure_future(self.connect())) asyncio.get_event_loop().call_later(1, lambda: asyncio.ensure_future(self.connect()))

View File

@ -21,6 +21,9 @@ from aiohttp.web import WebSocketResponse
from gns3server.web.route import Route from gns3server.web.route import Route
from gns3server.compute.notification_manager import NotificationManager from gns3server.compute.notification_manager import NotificationManager
import logging
log = logging.getLogger(__name__)
async def process_websocket(ws): async def process_websocket(ws):
""" """
@ -44,7 +47,7 @@ class NotificationHandler:
request.app['websockets'].add(ws) request.app['websockets'].add(ws)
asyncio.ensure_future(process_websocket(ws)) asyncio.ensure_future(process_websocket(ws))
log.info("New client has connected to compute WebSocket")
try: try:
with notifications.queue() as queue: with notifications.queue() as queue:
while True: while True:
@ -53,6 +56,7 @@ class NotificationHandler:
break break
await ws.send_str(notification) await ws.send_str(notification)
finally: finally:
log.info("Client has disconnected from compute WebSocket")
if not ws.closed: if not ws.closed:
await ws.close() await ws.close()
request.app['websockets'].discard(ws) request.app['websockets'].discard(ws)

View File

@ -21,6 +21,9 @@ from aiohttp.web import WebSocketResponse
from gns3server.web.route import Route from gns3server.web.route import Route
from gns3server.controller import Controller from gns3server.controller import Controller
import logging
log = logging.getLogger(__name__)
async def process_websocket(ws): async def process_websocket(ws):
""" """
@ -67,6 +70,7 @@ class NotificationHandler:
request.app['websockets'].add(ws) request.app['websockets'].add(ws)
asyncio.ensure_future(process_websocket(ws)) asyncio.ensure_future(process_websocket(ws))
log.info("New client has connected to controller WebSocket")
try: try:
with controller.notification.controller_queue() as queue: with controller.notification.controller_queue() as queue:
while True: while True:
@ -75,6 +79,7 @@ class NotificationHandler:
break break
await ws.send_str(notification) await ws.send_str(notification)
finally: finally:
log.info("Client has disconnected from controller WebSocket")
if not ws.closed: if not ws.closed:
await ws.close() await ws.close()
request.app['websockets'].discard(ws) request.app['websockets'].discard(ws)