diff --git a/gns3server/compute/base_manager.py b/gns3server/compute/base_manager.py index f383a197..602b0017 100644 --- a/gns3server/compute/base_manager.py +++ b/gns3server/compute/base_manager.py @@ -30,6 +30,7 @@ log = logging.getLogger(__name__) from uuid import UUID, uuid4 from gns3server.utils.interfaces import is_interface_up +from gns3server.utils.asyncio import asyncio_ensure_future from ..config import Config from ..utils.asyncio import wait_run_in_executor from ..utils import force_unix_path @@ -127,7 +128,7 @@ class BaseManager: tasks = [] for node_id in self._nodes.keys(): - tasks.append(asyncio.async(self.close_node(node_id))) + tasks.append(asyncio_ensure_future(self.close_node(node_id))) if tasks: done, _ = yield from asyncio.wait(tasks) diff --git a/gns3server/compute/docker/docker_vm.py b/gns3server/compute/docker/docker_vm.py index 242078ee..8db488f8 100644 --- a/gns3server/compute/docker/docker_vm.py +++ b/gns3server/compute/docker/docker_vm.py @@ -29,6 +29,7 @@ import os from gns3server.utils.asyncio.telnet_server import AsyncioTelnetServer from gns3server.utils.asyncio.raw_command_server import AsyncioRawCommandServer from gns3server.utils.asyncio import wait_for_file_creation +from gns3server.utils.asyncio import asyncio_ensure_future from gns3server.utils.get_resource import get_resource from gns3server.ubridge.ubridge_error import UbridgeError, UbridgeNamespaceError @@ -567,7 +568,7 @@ class DockerVM(BaseNode): output_stream.feed_data(self.name.encode() + b" console is now available... Press RETURN to get started.\r\n") - asyncio.async(self._read_console_output(self._console_websocket, output_stream)) + asyncio_ensure_future(self._read_console_output(self._console_websocket, output_stream)) @asyncio.coroutine def _read_console_output(self, ws, out): diff --git a/gns3server/compute/dynamips/__init__.py b/gns3server/compute/dynamips/__init__.py index c972c86a..b06f5ca0 100644 --- a/gns3server/compute/dynamips/__init__.py +++ b/gns3server/compute/dynamips/__init__.py @@ -36,6 +36,7 @@ log = logging.getLogger(__name__) from gns3server.utils.interfaces import interfaces, is_interface_up from gns3server.utils.asyncio import wait_run_in_executor from gns3server.utils import parse_version +from gns3server.utils.asyncio import asyncio_ensure_future from uuid import uuid4 from ..base_manager import BaseManager from ..port_manager import PortManager @@ -172,7 +173,7 @@ class Dynamips(BaseManager): tasks = [] for device in self._devices.values(): - tasks.append(asyncio.async(device.hypervisor.stop())) + tasks.append(asyncio_ensure_future(device.hypervisor.stop())) if tasks: done, _ = yield from asyncio.wait(tasks) @@ -196,7 +197,7 @@ class Dynamips(BaseManager): tasks = [] for device in self._devices.values(): if device.project.id == project.id: - tasks.append(asyncio.async(device.delete())) + tasks.append(asyncio_ensure_future(device.delete())) if tasks: done, _ = yield from asyncio.wait(tasks) diff --git a/gns3server/compute/dynamips/nodes/router.py b/gns3server/compute/dynamips/nodes/router.py index 8ff50afe..e42694d7 100644 --- a/gns3server/compute/dynamips/nodes/router.py +++ b/gns3server/compute/dynamips/nodes/router.py @@ -40,7 +40,7 @@ from ..nios.nio_udp import NIOUDP from gns3server.utils.file_watcher import FileWatcher -from gns3server.utils.asyncio import wait_run_in_executor, monitor_process +from gns3server.utils.asyncio import wait_run_in_executor, monitor_process, asyncio_ensure_future from gns3server.utils.images import md5sum @@ -196,7 +196,7 @@ class Router(BaseNode): """ Called when the NVRAM file has changed """ - asyncio.async(self.save_configs()) + asyncio_ensure_future(self.save_configs()) @property def dynamips_id(self): diff --git a/gns3server/compute/project.py b/gns3server/compute/project.py index 3792a8a7..ee2b1c19 100644 --- a/gns3server/compute/project.py +++ b/gns3server/compute/project.py @@ -29,7 +29,7 @@ from uuid import UUID, uuid4 from .port_manager import PortManager from .notification_manager import NotificationManager from ..config import Config -from ..utils.asyncio import wait_run_in_executor +from ..utils.asyncio import wait_run_in_executor, asyncio_ensure_future from ..utils.path import check_path_allowed, get_default_project_directory import logging @@ -346,7 +346,7 @@ class Project: tasks = [] for node in self._nodes: - tasks.append(asyncio.async(node.manager.close_node(node.id))) + tasks.append(asyncio_ensure_future(node.manager.close_node(node.id))) if tasks: done, _ = yield from asyncio.wait(tasks) diff --git a/gns3server/compute/vmware/__init__.py b/gns3server/compute/vmware/__init__.py index cda42c54..549a35ae 100644 --- a/gns3server/compute/vmware/__init__.py +++ b/gns3server/compute/vmware/__init__.py @@ -32,6 +32,7 @@ import shlex from collections import OrderedDict from gns3server.utils.interfaces import interfaces from gns3server.utils.asyncio import subprocess_check_output +from gns3server.utils.asyncio import asyncio_ensure_future from gns3server.utils import parse_version log = logging.getLogger(__name__) @@ -738,4 +739,4 @@ if __name__ == '__main__': loop = asyncio.get_event_loop() vmware = VMware.instance() print("=> Check version") - loop.run_until_complete(asyncio.async(vmware.check_vmware_version())) + loop.run_until_complete(asyncio_ensure_future(vmware.check_vmware_version())) diff --git a/gns3server/controller/compute.py b/gns3server/controller/compute.py index da81b6e9..c3231a32 100644 --- a/gns3server/controller/compute.py +++ b/gns3server/controller/compute.py @@ -27,7 +27,7 @@ from operator import itemgetter from ..utils import parse_version from ..utils.images import list_images -from ..utils.asyncio import locked_coroutine +from ..utils.asyncio import locked_coroutine, asyncio_ensure_future from ..controller.controller_error import ControllerError from ..version import __version__, __version_info__ @@ -418,7 +418,7 @@ class Compute: if self._connection_failure == 5: log.warning("Cannot connect to compute '{}': {}".format(self._id, e)) yield from self._controller.close_compute_projects(self) - asyncio.get_event_loop().call_later(2, lambda: asyncio.async(self._try_reconnect())) + asyncio.get_event_loop().call_later(2, lambda: asyncio_ensure_future(self._try_reconnect())) return except aiohttp.web.HTTPNotFound: raise aiohttp.web.HTTPConflict(text="The server {} is not a GNS3 server or it's a 1.X server".format(self._id)) @@ -494,7 +494,7 @@ class Compute: # Try to reconnect after 1 seconds if server unavailable only if not during tests (otherwise we create a ressources usage bomb) if not hasattr(sys, "_called_from_test") or not sys._called_from_test: - asyncio.get_event_loop().call_later(1, lambda: asyncio.async(self.connect())) + asyncio.get_event_loop().call_later(1, lambda: asyncio_ensure_future(self.connect())) self._ws = None self._cpu_usage_percent = None self._memory_usage_percent = None diff --git a/gns3server/controller/gns3vm/__init__.py b/gns3server/controller/gns3vm/__init__.py index 7f5dcd76..8ffd25b6 100644 --- a/gns3server/controller/gns3vm/__init__.py +++ b/gns3server/controller/gns3vm/__init__.py @@ -21,7 +21,7 @@ import asyncio import aiohttp import ipaddress -from ...utils.asyncio import locked_coroutine +from ...utils.asyncio import locked_coroutine, asyncio_ensure_future from .vmware_gns3_vm import VMwareGNS3VM from .virtualbox_gns3_vm import VirtualBoxGNS3VM from .remote_gns3_vm import RemoteGNS3VM @@ -302,7 +302,7 @@ class GNS3VM: # check if the VM is in the same subnet as the local server, start 10 seconds later to give # some time for the compute in the VM to be ready for requests - asyncio.get_event_loop().call_later(10, lambda: asyncio.async(self._check_network(compute))) + asyncio.get_event_loop().call_later(10, lambda: asyncio_ensure_future(self._check_network(compute))) @asyncio.coroutine def _check_network(self, compute): diff --git a/gns3server/controller/link.py b/gns3server/controller/link.py index 9f7ac83a..822bdbae 100644 --- a/gns3server/controller/link.py +++ b/gns3server/controller/link.py @@ -22,6 +22,8 @@ import html import asyncio import aiohttp +from gns3server.utils.asyncio import asyncio_ensure_future + import logging log = logging.getLogger(__name__) @@ -296,7 +298,7 @@ class Link: self._capturing = True self._capture_file_name = capture_file_name - self._streaming_pcap = asyncio.async(self._start_streaming_pcap()) + self._streaming_pcap = asyncio_ensure_future(self._start_streaming_pcap()) self._project.controller.notification.emit("link.updated", self.__json__()) @asyncio.coroutine diff --git a/gns3server/controller/project.py b/gns3server/controller/project.py index 7d2ef53d..7c750f80 100644 --- a/gns3server/controller/project.py +++ b/gns3server/controller/project.py @@ -38,6 +38,7 @@ from ..utils.path import check_path_allowed, get_default_project_directory from ..utils.asyncio.pool import Pool from ..utils.asyncio import locked_coroutine from ..utils.asyncio import wait_run_in_executor +from ..utils.asyncio import asyncio_ensure_future from .export_project import export_project from .import_project import import_project @@ -887,7 +888,7 @@ class Project: # Start all in the background without waiting for completion # we ignore errors because we want to let the user open # their project and fix it - asyncio.async(self.start_all()) + asyncio_ensure_future(self.start_all()) @asyncio.coroutine def wait_loaded(self): diff --git a/gns3server/handlers/api/compute/notification_handler.py b/gns3server/handlers/api/compute/notification_handler.py index 3580a286..d41bcaa9 100644 --- a/gns3server/handlers/api/compute/notification_handler.py +++ b/gns3server/handlers/api/compute/notification_handler.py @@ -20,6 +20,7 @@ import aiohttp from aiohttp.web import WebSocketResponse from gns3server.web.route import Route from gns3server.compute.notification_manager import NotificationManager +from gns3server.utils.asyncio import asyncio_ensure_future @asyncio.coroutine @@ -43,7 +44,7 @@ class NotificationHandler: ws = WebSocketResponse() yield from ws.prepare(request) - asyncio.async(process_websocket(ws)) + asyncio_ensure_future(process_websocket(ws)) with notifications.queue() as queue: while True: diff --git a/gns3server/handlers/api/controller/project_handler.py b/gns3server/handlers/api/controller/project_handler.py index e487c1d7..27ffbe2a 100644 --- a/gns3server/handlers/api/controller/project_handler.py +++ b/gns3server/handlers/api/controller/project_handler.py @@ -25,6 +25,7 @@ from gns3server.controller import Controller from gns3server.controller.import_project import import_project from gns3server.controller.export_project import export_project from gns3server.config import Config +from gns3server.utils.asyncio import asyncio_ensure_future from gns3server.schemas.project import ( @@ -246,7 +247,7 @@ class ProjectHandler: ws = aiohttp.web.WebSocketResponse() yield from ws.prepare(request) - asyncio.async(process_websocket(ws)) + asyncio_ensure_future(process_websocket(ws)) with controller.notification.queue(project) as queue: while True: diff --git a/gns3server/handlers/api/controller/server_handler.py b/gns3server/handlers/api/controller/server_handler.py index 972f8c54..465ea6ed 100644 --- a/gns3server/handlers/api/controller/server_handler.py +++ b/gns3server/handlers/api/controller/server_handler.py @@ -20,6 +20,7 @@ from gns3server.config import Config from gns3server.controller import Controller from gns3server.schemas.version import VERSION_SCHEMA from gns3server.version import __version__ +from gns3server.utils.asyncio import asyncio_ensure_future from aiohttp.web import HTTPConflict, HTTPForbidden @@ -57,7 +58,7 @@ class ServerHandler: tasks = [] for project in projects: - tasks.append(asyncio.async(project.close())) + tasks.append(asyncio_ensure_future(project.close())) if tasks: done, _ = yield from asyncio.wait(tasks) @@ -71,7 +72,7 @@ class ServerHandler: # then shutdown the server itself from gns3server.web.web_server import WebServer server = WebServer.instance() - asyncio.async(server.shutdown_server()) + asyncio_ensure_future(server.shutdown_server()) response.set_status(201) @Route.get( diff --git a/gns3server/utils/asyncio/__init__.py b/gns3server/utils/asyncio/__init__.py index 34e7571c..70a2cae1 100644 --- a/gns3server/utils/asyncio/__init__.py +++ b/gns3server/utils/asyncio/__init__.py @@ -108,7 +108,7 @@ def _check_process(process, termination_callback): def monitor_process(process, termination_callback): """Call termination_callback when a process dies""" - asyncio.async(_check_process(process, termination_callback)) + asyncio_ensure_future(_check_process(process, termination_callback)) @asyncio.coroutine @@ -158,3 +158,10 @@ def locked_coroutine(f): return (yield from f(*args, **kwargs)) return new_function + +#FIXME: conservative approach to supported versions, please remove it when we drop the support to Python < 3.4.4 +try: + from asyncio import ensure_future + asyncio_ensure_future = asyncio.ensure_future +except ImportError: + asyncio_ensure_future = getattr(asyncio, 'async') diff --git a/gns3server/utils/asyncio/raw_command_server.py b/gns3server/utils/asyncio/raw_command_server.py index da6e82c1..1a50ed0b 100644 --- a/gns3server/utils/asyncio/raw_command_server.py +++ b/gns3server/utils/asyncio/raw_command_server.py @@ -20,6 +20,8 @@ import copy import asyncio import asyncio.subprocess +from gns3server.utils.asyncio import asyncio_ensure_future + import logging log = logging.getLogger(__name__) @@ -69,8 +71,8 @@ class AsyncioRawCommandServer: else: replaces.append((replace[0], replace[1], )) - network_read = asyncio.async(network_reader.read(READ_SIZE)) - reader_read = asyncio.async(process_reader.read(READ_SIZE)) + network_read = asyncio_ensure_future(network_reader.read(READ_SIZE)) + reader_read = asyncio_ensure_future(process_reader.read(READ_SIZE)) timeout = 30 while True: @@ -89,7 +91,7 @@ class AsyncioRawCommandServer: if network_reader.at_eof(): raise ConnectionResetError() - network_read = asyncio.async(network_reader.read(READ_SIZE)) + network_read = asyncio_ensure_future(network_reader.read(READ_SIZE)) process_writer.write(data) yield from process_writer.drain() @@ -97,7 +99,7 @@ class AsyncioRawCommandServer: if process_reader.at_eof(): raise ConnectionResetError() - reader_read = asyncio.async(process_reader.read(READ_SIZE)) + reader_read = asyncio_ensure_future(process_reader.read(READ_SIZE)) for replace in replaces: data = data.replace(replace[0], replace[1]) diff --git a/gns3server/utils/asyncio/telnet_server.py b/gns3server/utils/asyncio/telnet_server.py index 782b12ae..2e736410 100644 --- a/gns3server/utils/asyncio/telnet_server.py +++ b/gns3server/utils/asyncio/telnet_server.py @@ -20,6 +20,8 @@ import asyncio import asyncio.subprocess import struct +from gns3server.utils.asyncio import asyncio_ensure_future + import logging log = logging.getLogger(__name__) @@ -212,9 +214,12 @@ class AsyncioTelnetServer: @asyncio.coroutine def close(self): for writer, connection in self._connections.items(): - writer.write_eof() - yield from writer.drain() - + try: + writer.write_eof() + yield from writer.drain() + except ConnectionResetError: + continue + @asyncio.coroutine def client_connected_hook(self): pass @@ -229,13 +234,13 @@ class AsyncioTelnetServer: self._reader_process = network_reader if self._reader: if self._reader_process == network_reader: - self._current_read = asyncio.async(self._reader.read(READ_SIZE)) + self._current_read = asyncio_ensure_future(self._reader.read(READ_SIZE)) return self._current_read return None @asyncio.coroutine def _process(self, network_reader, network_writer, connection): - network_read = asyncio.async(network_reader.read(READ_SIZE)) + network_read = asyncio_ensure_future(network_reader.read(READ_SIZE)) reader_read = yield from self._get_reader(network_reader) while True: @@ -261,7 +266,7 @@ class AsyncioTelnetServer: if network_reader.at_eof(): raise ConnectionResetError() - network_read = asyncio.async(network_reader.read(READ_SIZE)) + network_read = asyncio_ensure_future(network_reader.read(READ_SIZE)) if IAC in data: data = yield from self._IAC_parser(data, network_reader, network_writer, connection) @@ -418,10 +423,10 @@ if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG) loop = asyncio.get_event_loop() - process = loop.run_until_complete(asyncio.async(asyncio.subprocess.create_subprocess_exec("/bin/sh", "-i", - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.STDOUT, - stdin=asyncio.subprocess.PIPE))) + process = loop.run_until_complete(asyncio_ensure_future(asyncio.subprocess.create_subprocess_exec("/bin/sh", "-i", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.STDOUT, + stdin=asyncio.subprocess.PIPE))) server = AsyncioTelnetServer(reader=process.stdout, writer=process.stdin, binary=False, echo=False) coro = asyncio.start_server(server.run, '127.0.0.1', 4444, loop=loop) diff --git a/gns3server/web/web_server.py b/gns3server/web/web_server.py index c398e631..aad691d4 100644 --- a/gns3server/web/web_server.py +++ b/gns3server/web/web_server.py @@ -35,6 +35,7 @@ from ..compute import MODULES from ..compute.port_manager import PortManager from ..compute.qemu import Qemu from ..controller import Controller +from ..utils.asyncio import asyncio_ensure_future # do not delete this import import gns3server.handlers @@ -136,7 +137,7 @@ class WebServer: def signal_handler(signame, *args): log.warning("Server has got signal {}, exiting...".format(signame)) - asyncio.async(self.shutdown_server()) + asyncio_ensure_future(self.shutdown_server()) signals = ["SIGTERM", "SIGINT"] if sys.platform.startswith("win"): @@ -203,7 +204,7 @@ class WebServer: # Because with a large image collection # without md5sum already computed we start the # computing with server start - asyncio.async(Qemu.instance().list_images()) + asyncio_ensure_future(Qemu.instance().list_images()) def run(self): """ @@ -283,7 +284,7 @@ class WebServer: self._exit_handling() if server_config.getboolean("shell"): - asyncio.async(self.start_shell()) + asyncio_ensure_future(self.start_shell()) try: self._loop.run_forever()