Fix installation with Python 3.7. Fixes #1414.

Fix deprecated use of aiohttp.Timeout. Fixes #1296.
Use "async with" with aiohttp.ClientSession().
Make sure websocket connections are properly closed, see https://docs.aiohttp.org/en/stable/web_advanced.html#graceful-shutdown
Finish to drop Python 3.4.
This commit is contained in:
grossmj 2018-10-16 15:56:06 +07:00
parent 8217f65e97
commit 86f87aec74
11 changed files with 124 additions and 155 deletions

View File

@ -93,6 +93,8 @@ class Docker(BaseManager):
if self._connected:
if self._connector and not self._connector.closed:
self._connector.close()
if self._session and not self._session.closed:
await self._session.close()
async def query(self, method, path, data={}, params={}):
"""
@ -106,6 +108,7 @@ class Docker(BaseManager):
response = await self.http_query(method, path, data=data, params=params)
body = await response.read()
response.close()
if body and len(body):
if response.headers['CONTENT-TYPE'] == 'application/json':
body = json.loads(body.decode("utf-8"))
@ -140,14 +143,12 @@ class Docker(BaseManager):
if self._session is None or self._session.closed:
connector = self.connector()
self._session = aiohttp.ClientSession(connector=connector)
response = await self._session.request(
method,
url,
params=params,
data=data,
headers={"content-type": "application/json", },
timeout=timeout
)
response = await self._session.request(method,
url,
params=params,
data=data,
headers={"content-type": "application/json", },
timeout=timeout)
except (aiohttp.ClientResponseError, aiohttp.ClientOSError) as e:
raise DockerError("Docker has returned an error: {}".format(str(e)))
except (asyncio.TimeoutError):
@ -177,9 +178,7 @@ class Docker(BaseManager):
"""
url = "http://docker/v" + self._api_version + "/" + path
connection = await self._session.ws_connect(url,
origin="http://docker",
autoping=True)
connection = await self._session.ws_connect(url, origin="http://docker", autoping=True)
return connection
@locking

View File

@ -67,50 +67,46 @@ class Controller:
@locking
async def download_appliance_templates(self):
session = aiohttp.ClientSession()
try:
headers = {}
if self._appliance_templates_etag:
log.info("Checking if appliance templates are up-to-date (ETag {})".format(self._appliance_templates_etag))
headers["If-None-Match"] = self._appliance_templates_etag
response = await session.get('https://api.github.com/repos/GNS3/gns3-registry/contents/appliances', headers=headers)
if response.status == 304:
log.info("Appliance templates are already up-to-date (ETag {})".format(self._appliance_templates_etag))
return
elif response.status != 200:
raise aiohttp.web.HTTPConflict(text="Could not retrieve appliance templates on GitHub due to HTTP error code {}".format(response.status))
etag = response.headers.get("ETag")
if etag:
self._appliance_templates_etag = etag
self.save()
json_data = await response.json()
response.close()
appliances_dir = get_resource('appliances')
for appliance in json_data:
if appliance["type"] == "file":
appliance_name = appliance["name"]
log.info("Download appliance template file from '{}'".format(appliance["download_url"]))
response = await session.get(appliance["download_url"])
if response.status != 200:
log.warning("Could not download '{}' due to HTTP error code {}".format(appliance["download_url"], response.status))
continue
try:
appliance_data = await response.read()
except asyncio.TimeoutError:
log.warning("Timeout while downloading '{}'".format(appliance["download_url"]))
continue
path = os.path.join(appliances_dir, appliance_name)
try:
log.info("Saving {} file to {}".format(appliance_name, path))
with open(path, 'wb') as f:
f.write(appliance_data)
except OSError as e:
raise aiohttp.web.HTTPConflict(text="Could not write appliance template file '{}': {}".format(path, e))
async with aiohttp.ClientSession() as session:
async with session.get('https://api.github.com/repos/GNS3/gns3-registry/contents/appliances', headers=headers) as response:
if response.status == 304:
log.info("Appliance templates are already up-to-date (ETag {})".format(self._appliance_templates_etag))
return
elif response.status != 200:
raise aiohttp.web.HTTPConflict(text="Could not retrieve appliance templates on GitHub due to HTTP error code {}".format(response.status))
etag = response.headers.get("ETag")
if etag:
self._appliance_templates_etag = etag
self.save()
json_data = await response.json()
appliances_dir = get_resource('appliances')
for appliance in json_data:
if appliance["type"] == "file":
appliance_name = appliance["name"]
log.info("Download appliance template file from '{}'".format(appliance["download_url"]))
async with session.get(appliance["download_url"]) as response:
if response.status != 200:
log.warning("Could not download '{}' due to HTTP error code {}".format(appliance["download_url"], response.status))
continue
try:
appliance_data = await response.read()
except asyncio.TimeoutError:
log.warning("Timeout while downloading '{}'".format(appliance["download_url"]))
continue
path = os.path.join(appliances_dir, appliance_name)
try:
log.info("Saving {} file to {}".format(appliance_name, path))
with open(path, 'wb') as f:
f.write(appliance_data)
except OSError as e:
raise aiohttp.web.HTTPConflict(text="Could not write appliance template file '{}': {}".format(path, e))
except ValueError as e:
raise aiohttp.web.HTTPConflict(text="Could not read appliance templates information from GitHub: {}".format(e))
finally:
session.close()
def load_appliance_templates(self):

View File

@ -18,6 +18,7 @@
import ipaddress
import aiohttp
import asyncio
import async_timeout
import socket
import json
import uuid
@ -52,22 +53,6 @@ class ComputeConflict(aiohttp.web.HTTPConflict):
self.response = response
class Timeout(aiohttp.Timeout):
"""
Could be removed with aiohttp 0.22 that support None timeout
"""
def __enter__(self):
if self._timeout:
return super().__enter__()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self._timeout:
return super().__exit__(exc_type, exc_val, exc_tb)
return self
class Compute:
"""
A GNS3 compute.
@ -101,12 +86,8 @@ class Compute:
"node_types": []
}
self.name = name
# Websocket for notifications
self._ws = None
# Cache of interfaces on remote host
self._interfaces_cache = None
self._connection_failure = 0
def _session(self):
@ -114,9 +95,10 @@ class Compute:
self._http_session = aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=None, force_close=True))
return self._http_session
def __del__(self):
if self._http_session:
self._http_session.close()
#def __del__(self):
# pass
# if self._http_session:
# self._http_session.close()
def _set_auth(self, user, password):
"""
@ -162,19 +144,16 @@ class Compute:
# It's important to set user and password at the same time
if "user" in kwargs or "password" in kwargs:
self._set_auth(kwargs.get("user", self._user), kwargs.get("password", self._password))
if self._http_session:
self._http_session.close()
if self._http_session and not self._http_session.closed:
await self._http_session.close()
self._connected = False
self._controller.notification.controller_emit("compute.updated", self.__json__())
self._controller.save()
async def close(self):
self._connected = False
if self._http_session:
self._http_session.close()
if self._ws:
await self._ws.close()
self._ws = None
if self._http_session and not self._http_session.closed:
await self._http_session.close()
self._closed = True
@property
@ -474,35 +453,27 @@ class Compute:
"""
Connect to the notification stream
"""
try:
self._ws = await self._session().ws_connect(self._getUrl("/notifications/ws"), auth=self._auth)
except (aiohttp.WSServerHandshakeError, aiohttp.ClientResponseError):
self._ws = None
while self._ws is not None:
try:
response = await self._ws.receive()
except aiohttp.WSServerHandshakeError:
self._ws = None
break
if response.tp == aiohttp.WSMsgType.closed or response.tp == aiohttp.WSMsgType.error or response.data is None:
self._connected = False
break
msg = json.loads(response.data)
action = msg.pop("action")
event = msg.pop("event")
if action == "ping":
self._cpu_usage_percent = event["cpu_usage_percent"]
self._memory_usage_percent = event["memory_usage_percent"]
self._controller.notification.controller_emit("compute.updated", self.__json__())
else:
await self._controller.notification.dispatch(action, event, compute_id=self.id)
if self._ws:
await self._ws.close()
async with self._session().ws_connect(self._getUrl("/notifications/ws"), auth=self._auth) as ws:
async for response in ws:
if response.type == aiohttp.WSMsgType.TEXT and response.data:
msg = json.loads(response.data)
action = msg.pop("action")
event = msg.pop("event")
if action == "ping":
self._cpu_usage_percent = event["cpu_usage_percent"]
self._memory_usage_percent = event["memory_usage_percent"]
self._controller.notification.controller_emit("compute.updated", self.__json__())
else:
await self._controller.notification.dispatch(action, event, compute_id=self.id)
elif response.type == aiohttp.WSMsgType.CLOSED or response.type == aiohttp.WSMsgType.ERROR or response.data is None:
self._connected = False
break
# Try to reconnect after 1 seconds if server unavailable only if not during tests (otherwise we create a ressources usage bomb)
if not hasattr(sys, "_called_from_test") or not sys._called_from_test:
asyncio.get_event_loop().call_later(1, lambda: asyncio.ensure_future(self.connect()))
self._ws = None
self._cpu_usage_percent = None
self._memory_usage_percent = None
self._controller.notification.controller_emit("compute.updated", self.__json__())
@ -527,7 +498,7 @@ class Compute:
return self._getUrl(path)
async def _run_http_query(self, method, path, data=None, timeout=20, raw=False):
with Timeout(timeout):
with async_timeout.timeout(timeout):
url = self._getUrl(path)
headers = {}
headers['content-type'] = 'application/json'

View File

@ -261,29 +261,22 @@ class VirtualBoxGNS3VM(BaseGNS3VM):
"""
remaining_try = 300
while remaining_try > 0:
json_data = None
session = aiohttp.ClientSession()
try:
resp = None
resp = await session.get('http://127.0.0.1:{}/v2/compute/network/interfaces'.format(api_port))
except (OSError, aiohttp.ClientError, TimeoutError, asyncio.TimeoutError):
pass
if resp:
if resp.status < 300:
try:
json_data = await resp.json()
except ValueError:
pass
resp.close()
session.close()
if json_data:
for interface in json_data:
if "name" in interface and interface["name"] == "eth{}".format(hostonly_interface_number - 1):
if "ip_address" in interface and len(interface["ip_address"]) > 0:
return interface["ip_address"]
async with aiohttp.ClientSession() as session:
try:
async with session.get('http://127.0.0.1:{}/v2/compute/network/interfaces'.format(api_port)) as resp:
if resp.status < 300:
try:
json_data = await resp.json()
if json_data:
for interface in json_data:
if "name" in interface and interface["name"] == "eth{}".format(
hostonly_interface_number - 1):
if "ip_address" in interface and len(interface["ip_address"]) > 0:
return interface["ip_address"]
except ValueError:
pass
except (OSError, aiohttp.ClientError, TimeoutError, asyncio.TimeoutError):
pass
remaining_try -= 1
await asyncio.sleep(1)
raise GNS3VMError("Could not get the GNS3 VM ip make sure the VM receive an IP from VirtualBox")

View File

@ -42,15 +42,17 @@ class NotificationHandler:
ws = WebSocketResponse()
await ws.prepare(request)
request.app['websockets'].add(ws)
asyncio.ensure_future(process_websocket(ws))
with notifications.queue() as queue:
while True:
try:
notification = await queue.get_json(1)
if ws.closed:
break
await ws.send_str(notification)
except asyncio.futures.CancelledError:
break
if ws.closed:
break
ws.send_str(notification)
finally:
request.app['websockets'].discard(ws)
return ws

View File

@ -69,14 +69,17 @@ class NotificationHandler:
ws = aiohttp.web.WebSocketResponse()
await ws.prepare(request)
request.app['websockets'].add(ws)
asyncio.ensure_future(process_websocket(ws))
with controller.notification.controller_queue() as queue:
while True:
try:
notification = await queue.get_json(5)
if ws.closed:
break
await ws.send_str(notification)
except asyncio.futures.CancelledError:
break
if ws.closed:
break
ws.send_str(notification)
finally:
request.app['websockets'].discard(ws)
return ws

View File

@ -261,17 +261,19 @@ class ProjectHandler:
ws = aiohttp.web.WebSocketResponse()
await ws.prepare(request)
request.app['websockets'].add(ws)
asyncio.ensure_future(process_websocket(ws))
with controller.notification.project_queue(project) as queue:
while True:
try:
notification = await queue.get_json(5)
except asyncio.futures.CancelledError as e:
if ws.closed:
break
await ws.send_str(notification)
except asyncio.futures.CancelledError:
break
if ws.closed:
break
ws.send_str(notification)
finally:
request.app['websockets'].discard(ws)
if project.auto_close:
# To avoid trouble with client connecting disconnecting we sleep few seconds before checking

View File

@ -51,6 +51,7 @@ class Response(aiohttp.web.Response):
super().enable_chunked_encoding()
async def prepare(self, request):
if log.getEffectiveLevel() == logging.DEBUG:
log.info("%s %s", request.method, request.path_qs)
log.debug("%s", dict(request.headers))

View File

@ -28,6 +28,7 @@ import aiohttp_cors
import functools
import time
import atexit
import weakref
# Import encoding now, to avoid implicit import later.
# Implicit import within threads may cause LookupError when standard library is in a ZIP
@ -48,8 +49,8 @@ import gns3server.handlers
import logging
log = logging.getLogger(__name__)
if not (aiohttp.__version__.startswith("2.2") or aiohttp.__version__.startswith("2.3")):
raise RuntimeError("aiohttp 2.2.x or 2.3.x is required to run the GNS3 server")
if not (aiohttp.__version__.startswith("3.")):
raise RuntimeError("aiohttp 3.x is required to run the GNS3 server")
class WebServer:
@ -100,18 +101,17 @@ class WebServer:
log.warning("Close is already in progress")
return
# close websocket connections
for ws in set(self._app['websockets']):
await ws.close(code=aiohttp.WSCloseCode.GOING_AWAY, message='Server shutdown')
if self._server:
self._server.close()
await self._server.wait_closed()
if self._app:
await self._app.shutdown()
if self._handler:
try:
# aiohttp < 2.3
await self._handler.finish_connections(2) # Parameter is timeout
except AttributeError:
# aiohttp >= 2.3
await self._handler.shutdown(2) # Parameter is timeout
await self._handler.shutdown(2) # Parameter is timeout
if self._app:
await self._app.cleanup()
@ -254,6 +254,10 @@ class WebServer:
log.debug("ENV %s=%s", key, val)
self._app = aiohttp.web.Application()
# Keep a list of active websocket connections
self._app['websockets'] = weakref.WeakSet()
# Background task started with the server
self._app.on_startup.append(self._on_startup)

View File

@ -1,12 +1,10 @@
jsonschema>=2.4.0
aiohttp>=2.3.3,<2.4.0 # pyup: ignore
aiohttp-cors>=0.5.3,<0.6.0 # pyup: ignore
yarl>=0.11
aiohttp==3.2.1
aiohttp-cors==0.7.0
Jinja2>=2.7.3
raven>=5.23.0
psutil>=3.0.0
zipstream>=1.1.4
typing>=3.5.3.0 # Otherwise yarl fails with python 3.4
prompt-toolkit==1.0.15
async-timeout<3.0.0 # pyup: ignore; 3.0 drops support for python 3.4
async-timeout==3.0.1
distro>=1.3.0

View File

@ -19,9 +19,9 @@ import sys
from setuptools import setup, find_packages
from setuptools.command.test import test as TestCommand
# we only support Python 3 version >= 3.4
if len(sys.argv) >= 2 and sys.argv[1] == "install" and sys.version_info < (3, 4):
raise SystemExit("Python 3.4 or higher is required")
# we only support Python 3 version >= 3.5.3
if len(sys.argv) >= 2 and sys.argv[1] == "install" and sys.version_info < (3, 5, 3):
raise SystemExit("Python 3.5.3 or higher is required")
class PyTest(TestCommand):