Fix long-polling request for project notifications.

This commit is contained in:
grossmj 2019-02-23 21:08:52 +07:00
parent 10702f87bc
commit 7fe8f7e716
10 changed files with 82 additions and 64 deletions

View File

@ -460,10 +460,7 @@ class BaseManager:
if not data:
await asyncio.sleep(0.1)
continue
try:
await response.write(data)
except ConnectionError:
break
await response.write(data)
except FileNotFoundError:
raise aiohttp.web.HTTPNotFound()
except PermissionError:

View File

@ -446,13 +446,14 @@ class Compute:
msg = json.loads(response.data)
action = msg.pop("action")
event = msg.pop("event")
project_id = msg.pop("project_id", None)
if action == "ping":
self._cpu_usage_percent = event["cpu_usage_percent"]
self._memory_usage_percent = event["memory_usage_percent"]
#FIXME: slow down number of compute events
self._controller.notification.controller_emit("compute.updated", self.__json__())
else:
await self._controller.notification.dispatch(action, event, compute_id=self.id)
await self._controller.notification.dispatch(action, event, project_id=project_id, compute_id=self.id)
elif response.type == aiohttp.WSMsgType.CLOSED or response.type == aiohttp.WSMsgType.ERROR or response.data is None:
self._connected = False
break

View File

@ -185,7 +185,7 @@ class Drawing:
data = self.__json__()
if not svg_changed:
del data["svg"]
self._project.controller.notification.project_emit("drawing.updated", data)
self._project.emit_notification("drawing.updated", data)
self._project.dump()
def __json__(self, topology_dump=False):

View File

@ -74,7 +74,7 @@ async def export_project(project, temporary_dir, include_images=False, keep_comp
except OSError as e:
msg = "Could not export file {}: {}".format(path, e)
log.warning(msg)
project.controller.notification.project_emit("log.warning", {"message": msg})
project.emit_notification("log.warning", {"message": msg})
continue
# ignore the .gns3 file
if file.endswith(".gns3"):

View File

@ -199,14 +199,14 @@ class Link:
self._filters = new_filters
if self._created:
await self.update()
self._project.controller.notification.project_emit("link.updated", self.__json__())
self._project.emit_notification("link.updated", self.__json__())
self._project.dump()
async def update_suspend(self, value):
if value != self._suspended:
self._suspended = value
await self.update()
self._project.controller.notification.project_emit("link.updated", self.__json__())
self._project.emit_notification("link.updated", self.__json__())
self._project.dump()
@property
@ -269,7 +269,7 @@ class Link:
n["node"].add_link(self)
n["port"].link = self
self._created = True
self._project.controller.notification.project_emit("link.created", self.__json__())
self._project.emit_notification("link.created", self.__json__())
if dump:
self._project.dump()
@ -282,7 +282,7 @@ class Link:
label = node_data.get("label")
if label:
port["label"] = label
self._project.controller.notification.project_emit("link.updated", self.__json__())
self._project.emit_notification("link.updated", self.__json__())
self._project.dump()
async def create(self):
@ -317,7 +317,7 @@ class Link:
self._capturing = True
self._capture_file_name = capture_file_name
self._project.controller.notification.project_emit("link.updated", self.__json__())
self._project.emit_notification("link.updated", self.__json__())
async def stop_capture(self):
"""
@ -325,7 +325,7 @@ class Link:
"""
self._capturing = False
self._project.controller.notification.project_emit("link.updated", self.__json__())
self._project.emit_notification("link.updated", self.__json__())
def pcap_streaming_url(self):
"""

View File

@ -405,7 +405,7 @@ class Node:
await self.parse_node_response(response.json)
elif old_json != self.__json__():
# We send notif only if object has changed
self.project.controller.notification.project_emit("node.updated", self.__json__())
self.project.emit_notification("node.updated", self.__json__())
self.project.dump()
async def parse_node_response(self, response):
@ -563,13 +563,13 @@ class Node:
for directory in images_directories(type):
image = os.path.join(directory, img)
if os.path.exists(image):
self.project.controller.notification.project_emit("log.info", {"message": "Uploading missing image {}".format(img)})
self.project.emit_notification("log.info", {"message": "Uploading missing image {}".format(img)})
try:
with open(image, 'rb') as f:
await self._compute.post("/{}/images/{}".format(self._node_type, os.path.basename(img)), data=f, timeout=None)
except OSError as e:
raise aiohttp.web.HTTPConflict(text="Can't upload {}: {}".format(image, str(e)))
self.project.controller.notification.project_emit("log.info", {"message": "Upload finished for {}".format(img)})
self.project.emit_notification("log.info", {"message": "Upload finished for {}".format(img)})
return True
return False

View File

@ -33,19 +33,19 @@ class Notification:
self._controller_listeners = []
@contextmanager
def project_queue(self, project):
def project_queue(self, project_id):
"""
Get a queue of notifications
Use it with Python with
"""
queue = NotificationQueue()
self._project_listeners.setdefault(project.id, set())
self._project_listeners[project.id].add(queue)
self._project_listeners.setdefault(project_id, set())
self._project_listeners[project_id].add(queue)
try:
yield queue
finally:
self._project_listeners[project.id].remove(queue)
self._project_listeners[project_id].remove(queue)
@contextmanager
def controller_queue(self):
@ -84,14 +84,14 @@ class Notification:
for controller_listener in self._controller_listeners:
controller_listener.put_nowait((action, event, {}))
def project_has_listeners(self, project):
def project_has_listeners(self, project_id):
"""
:param project_id: Project object
:returns: True if client listen this project
"""
return project.id in self._project_listeners and len(self._project_listeners[project.id]) > 0
return project_id in self._project_listeners and len(self._project_listeners[project_id]) > 0
async def dispatch(self, action, event, compute_id):
async def dispatch(self, action, event, project_id, compute_id):
"""
Notification received from compute node. Send it directly
to clients or process it
@ -110,13 +110,13 @@ class Notification:
self.project_emit("node.updated", node.__json__())
except (aiohttp.web.HTTPNotFound, aiohttp.web.HTTPForbidden): # Project closing
return
elif action == "ping":
event["compute_id"] = compute_id
self.project_emit(action, event)
# elif action == "ping":
# event["compute_id"] = compute_id
# self.project_emit(action, event)
else:
self.project_emit(action, event)
self.project_emit(action, event, project_id)
def project_emit(self, action, event):
def project_emit(self, action, event, project_id=None):
"""
Send a notification to clients scoped by projects
@ -136,8 +136,8 @@ class Notification:
except TypeError: # If we receive a mock as an event it will raise TypeError when using json dump
pass
if "project_id" in event:
self._send_event_to_project(event["project_id"], action, event)
if "project_id" in event or project_id:
self._send_event_to_project(event.get("project_id", project_id), action, event)
else:
self._send_event_to_all_projects(action, event)

View File

@ -122,6 +122,16 @@ class Project:
assert self._status != "closed"
self.dump()
def emit_notification(self, action, event):
"""
Emit a notification to all clients using this project.
:param action: Action name
:param event: Event to send
"""
self.controller.notification.project_emit(action, event, project_id=self.id)
async def update(self, **kwargs):
"""
Update the project
@ -135,7 +145,7 @@ class Project:
# We send notif only if object has changed
if old_json != self.__json__():
self.controller.notification.project_emit("project.updated", self.__json__())
self.emit_notification("project.updated", self.__json__())
self.dump()
# update on computes
@ -533,7 +543,7 @@ class Project:
self._project_created_on_compute.add(compute)
await node.create()
self._nodes[node.id] = node
self.controller.notification.project_emit("node.created", node.__json__())
self.emit_notification("node.created", node.__json__())
if dump:
self.dump()
return node
@ -558,7 +568,7 @@ class Project:
del self._nodes[node.id]
await node.destroy()
self.dump()
self.controller.notification.project_emit("node.deleted", node.__json__())
self.emit_notification("node.deleted", node.__json__())
@open_required
def get_node(self, node_id):
@ -623,7 +633,7 @@ class Project:
if drawing_id not in self._drawings:
drawing = Drawing(self, drawing_id=drawing_id, **kwargs)
self._drawings[drawing.id] = drawing
self.controller.notification.project_emit("drawing.created", drawing.__json__())
self.emit_notification("drawing.created", drawing.__json__())
if dump:
self.dump()
return drawing
@ -644,7 +654,7 @@ class Project:
drawing = self.get_drawing(drawing_id)
del self._drawings[drawing.id]
self.dump()
self.controller.notification.project_emit("drawing.deleted", drawing.__json__())
self.emit_notification("drawing.deleted", drawing.__json__())
@open_required
async def add_link(self, link_id=None, dump=True):
@ -671,7 +681,7 @@ class Project:
if force_delete is False:
raise
self.dump()
self.controller.notification.project_emit("link.deleted", link.__json__())
self.emit_notification("link.deleted", link.__json__())
@open_required
def get_link(self, link_id):
@ -743,7 +753,7 @@ class Project:
self._clean_pictures()
self._status = "closed"
if not ignore_notification:
self.controller.notification.project_emit("project.closed", self.__json__())
self.emit_notification("project.closed", self.__json__())
self.reset()
def _clean_pictures(self):

View File

@ -122,7 +122,7 @@ class Snapshot:
except (OSError, PermissionError) as e:
raise aiohttp.web.HTTPConflict(text=str(e))
await project.open()
self._project.controller.notification.project_emit("snapshot.restored", self.__json__())
self._project.emit_notification("snapshot.restored", self.__json__())
return self._project
def __json__(self):

View File

@ -220,24 +220,31 @@ class ProjectHandler:
async def notification(request, response):
controller = Controller.instance()
project = controller.get_project(request.match_info["project_id"])
project_id = request.match_info["project_id"]
response.content_type = "application/json"
response.set_status(200)
response.enable_chunked_encoding()
await response.prepare(request)
with controller.notification.project_queue(project) as queue:
while True:
msg = await queue.get_json(5)
await response.write(("{}\n".format(msg)).encode("utf-8"))
log.info("New client has connected to the notification stream for project ID '{}' (HTTP long-polling method)".format(project_id))
if project.auto_close:
# To avoid trouble with client connecting disconnecting we sleep few seconds before checking
# if someone else is not connected
await asyncio.sleep(5)
if not controller.notification.project_has_listeners(project):
await project.close()
try:
with controller.notification.project_queue(project_id) as queue:
while True:
msg = await queue.get_json(5)
await response.write(("{}\n".format(msg)).encode("utf-8"))
finally:
log.info("Client has disconnected from notification for project ID '{}' (HTTP long-polling method)".format(project_id))
try:
project = controller.get_project(project_id)
if project.auto_close:
# To avoid trouble with client connecting disconnecting we sleep few seconds before checking
# if someone else is not connected
await asyncio.sleep(5)
if not controller.notification.project_has_listeners(project.id):
log.info("Project '{}' is automatically closing due to no client listening".format(project.id))
await project.close()
except aiohttp.web.HTTPNotFound:
pass
@Route.get(
r"/projects/{project_id}/notifications/ws",
@ -252,31 +259,36 @@ class ProjectHandler:
async def notification_ws(request, response):
controller = Controller.instance()
project = controller.get_project(request.match_info["project_id"])
project_id = request.match_info["project_id"]
ws = aiohttp.web.WebSocketResponse()
await ws.prepare(request)
request.app['websockets'].add(ws)
asyncio.ensure_future(process_websocket(ws))
log.info("New client has connected to the notification stream for project ID '{}' (WebSocket method)".format(project_id))
try:
with controller.notification.project_queue(project) as queue:
with controller.notification.project_queue(project_id) as queue:
while True:
notification = await queue.get_json(5)
if ws.closed:
break
await ws.send_str(notification)
finally:
log.info("Client has disconnected from notification stream for project ID '{}' (WebSocket method)".format(project_id))
if not ws.closed:
await ws.close()
request.app['websockets'].discard(ws)
if project.auto_close:
# To avoid trouble with client connecting disconnecting we sleep few seconds before checking
# if someone else is not connected
await asyncio.sleep(5)
if not controller.notification.project_has_listeners(project):
await project.close()
try:
project = controller.get_project(project_id)
if project.auto_close:
# To avoid trouble with client connecting disconnecting we sleep few seconds before checking
# if someone else is not connected
await asyncio.sleep(5)
if not controller.notification.project_has_listeners(project_id):
log.info("Project '{}' is automatically closing due to no client listening".format(project.id))
await project.close()
except aiohttp.web.HTTPNotFound:
pass
return ws
@ -298,9 +310,7 @@ class ProjectHandler:
try:
with tempfile.TemporaryDirectory() as tmp_dir:
stream = await export_project(project,
tmp_dir,
include_images=bool(int(request.query.get("include_images", "0"))))
stream = await export_project(project, tmp_dir, include_images=bool(int(request.query.get("include_images", "0"))))
# We need to do that now because export could failed and raise an HTTP error
# that why response start need to be the later possible
response.content_type = 'application/gns3project'