Fix issue with notification queue that prevented to properly close projects. Fix #1493

This commit is contained in:
grossmj 2019-01-12 16:02:36 +07:00
parent 161c05a310
commit a896346c77
4 changed files with 34 additions and 29 deletions

View File

@ -17,7 +17,6 @@
import os
import aiohttp
import asyncio
from contextlib import contextmanager
from ..notification_queue import NotificationQueue
@ -43,8 +42,10 @@ class Notification:
queue = NotificationQueue()
self._project_listeners.setdefault(project.id, set())
self._project_listeners[project.id].add(queue)
yield queue
self._project_listeners[project.id].remove(queue)
try:
yield queue
finally:
self._project_listeners[project.id].remove(queue)
@contextmanager
def controller_queue(self):
@ -55,8 +56,10 @@ class Notification:
"""
queue = NotificationQueue()
self._controller_listeners.append(queue)
yield queue
self._controller_listeners.remove(queue)
try:
yield queue
finally:
self._controller_listeners.remove(queue)
def controller_emit(self, action, event):
"""

View File

@ -44,15 +44,17 @@ class NotificationHandler:
request.app['websockets'].add(ws)
asyncio.ensure_future(process_websocket(ws))
with notifications.queue() as queue:
try:
try:
with notifications.queue() as queue:
while True:
notification = await queue.get_json(1)
if ws.closed:
break
await ws.send_str(notification)
finally:
if not ws.closed:
await ws.close()
request.app['websockets'].discard(ws)
finally:
if not ws.closed:
await ws.close()
request.app['websockets'].discard(ws)
return ws

View File

@ -67,16 +67,16 @@ class NotificationHandler:
request.app['websockets'].add(ws)
asyncio.ensure_future(process_websocket(ws))
with controller.notification.controller_queue() as queue:
try:
try:
with controller.notification.controller_queue() as queue:
while True:
notification = await queue.get_json(5)
if ws.closed:
break
await ws.send_str(notification)
finally:
if not ws.closed:
await ws.close()
request.app['websockets'].discard(ws)
finally:
if not ws.closed:
await ws.close()
request.app['websockets'].discard(ws)
return ws

View File

@ -259,24 +259,24 @@ class ProjectHandler:
request.app['websockets'].add(ws)
asyncio.ensure_future(process_websocket(ws))
with controller.notification.project_queue(project) as queue:
try:
try:
with controller.notification.project_queue(project) as queue:
while True:
notification = await queue.get_json(5)
if ws.closed:
break
await ws.send_str(notification)
finally:
if not ws.closed:
await ws.close()
request.app['websockets'].discard(ws)
finally:
if not ws.closed:
await ws.close()
request.app['websockets'].discard(ws)
if project.auto_close:
# To avoid trouble with client connecting disconnecting we sleep few seconds before checking
# if someone else is not connected
await asyncio.sleep(5)
if not controller.notification.project_has_listeners(project):
await project.close()
if project.auto_close:
# To avoid trouble with client connecting disconnecting we sleep few seconds before checking
# if someone else is not connected
await asyncio.sleep(5)
if not controller.notification.project_has_listeners(project):
await project.close()
return ws