mirror of
https://github.com/GNS3/gns3-server.git
synced 2024-11-16 16:54:51 +02:00
Replace asyncio.async() by asyncio.ensure_future() in tests. Ref #2566.
This commit is contained in:
parent
089d25c79d
commit
a3d1e865a8
@ -45,7 +45,7 @@ def test_query_success(loop, vm):
|
||||
|
||||
response.read.side_effect = read
|
||||
vm._session.request = AsyncioMagicMock(return_value=response)
|
||||
data = loop.run_until_complete(asyncio.async(vm.query("POST", "test", data={"a": True}, params={"b": 1})))
|
||||
data = loop.run_until_complete(asyncio.ensure_future(vm.query("POST", "test", data={"a": True}, params={"b": 1})))
|
||||
vm._session.request.assert_called_with('POST',
|
||||
'http://docker/v1.25/test',
|
||||
data='{"a": true}',
|
||||
@ -68,7 +68,7 @@ def test_query_error(loop, vm):
|
||||
response.read.side_effect = read
|
||||
vm._session.request = AsyncioMagicMock(return_value=response)
|
||||
with pytest.raises(DockerError):
|
||||
data = loop.run_until_complete(asyncio.async(vm.query("POST", "test", data={"a": True}, params={"b": 1})))
|
||||
data = loop.run_until_complete(asyncio.ensure_future(vm.query("POST", "test", data={"a": True}, params={"b": 1})))
|
||||
vm._session.request.assert_called_with('POST',
|
||||
'http://docker/v1.25/test',
|
||||
data='{"a": true}',
|
||||
@ -89,7 +89,7 @@ def test_query_error_json(loop, vm):
|
||||
response.read.side_effect = read
|
||||
vm._session.request = AsyncioMagicMock(return_value=response)
|
||||
with pytest.raises(DockerError):
|
||||
data = loop.run_until_complete(asyncio.async(vm.query("POST", "test", data={"a": True}, params={"b": 1})))
|
||||
data = loop.run_until_complete(asyncio.ensure_future(vm.query("POST", "test", data={"a": True}, params={"b": 1})))
|
||||
vm._session.request.assert_called_with('POST',
|
||||
'http://docker/v1.25/test',
|
||||
data='{"a": true}',
|
||||
@ -126,7 +126,7 @@ def test_list_images(loop):
|
||||
]
|
||||
|
||||
with asyncio_patch("gns3server.compute.docker.Docker.query", return_value=response) as mock:
|
||||
images = loop.run_until_complete(asyncio.async(Docker.instance().list_images()))
|
||||
images = loop.run_until_complete(asyncio.ensure_future(Docker.instance().list_images()))
|
||||
mock.assert_called_with("GET", "images/json", params={"all": 0})
|
||||
assert len(images) == 5
|
||||
assert {"image": "ubuntu:12.04"} in images
|
||||
@ -160,7 +160,7 @@ def test_pull_image(loop):
|
||||
|
||||
with asyncio_patch("gns3server.compute.docker.Docker.query", side_effect=DockerHttp404Error("404")):
|
||||
with asyncio_patch("gns3server.compute.docker.Docker.http_query", return_value=mock_query) as mock:
|
||||
images = loop.run_until_complete(asyncio.async(Docker.instance().pull_image("ubuntu")))
|
||||
images = loop.run_until_complete(asyncio.ensure_future(Docker.instance().pull_image("ubuntu")))
|
||||
mock.assert_called_with("POST", "images/create", params={"fromImage": "ubuntu"}, timeout=None)
|
||||
|
||||
|
||||
@ -174,7 +174,7 @@ def test_docker_check_connection_docker_minimum_version(vm, loop):
|
||||
asyncio_patch("gns3server.compute.docker.Docker.query", return_value=response):
|
||||
vm._connected = False
|
||||
with pytest.raises(DockerError):
|
||||
loop.run_until_complete(asyncio.async(vm._check_connection()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm._check_connection()))
|
||||
|
||||
|
||||
def test_docker_check_connection_docker_preferred_version_against_newer(vm, loop):
|
||||
@ -185,7 +185,7 @@ def test_docker_check_connection_docker_preferred_version_against_newer(vm, loop
|
||||
with patch("gns3server.compute.docker.Docker.connector"), \
|
||||
asyncio_patch("gns3server.compute.docker.Docker.query", return_value=response):
|
||||
vm._connected = False
|
||||
loop.run_until_complete(asyncio.async(vm._check_connection()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm._check_connection()))
|
||||
assert vm._api_version == DOCKER_PREFERRED_API_VERSION
|
||||
|
||||
|
||||
@ -197,5 +197,5 @@ def test_docker_check_connection_docker_preferred_version_against_older(vm, loop
|
||||
with patch("gns3server.compute.docker.Docker.connector"), \
|
||||
asyncio_patch("gns3server.compute.docker.Docker.query", return_value=response):
|
||||
vm._connected = False
|
||||
loop.run_until_complete(asyncio.async(vm._check_connection()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm._check_connection()))
|
||||
assert vm._api_version == DOCKER_MINIMUM_API_VERSION
|
@ -87,7 +87,7 @@ def test_create(loop, project, manager):
|
||||
with asyncio_patch("gns3server.compute.docker.Docker.list_images", return_value=[{"image": "ubuntu"}]) as mock_list_images:
|
||||
with asyncio_patch("gns3server.compute.docker.Docker.query", return_value=response) as mock:
|
||||
vm = DockerVM("test", str(uuid.uuid4()), project, manager, "ubuntu:latest")
|
||||
loop.run_until_complete(asyncio.async(vm.create()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.create()))
|
||||
mock.assert_called_with("POST", "containers/create", data={
|
||||
"Tty": True,
|
||||
"OpenStdin": True,
|
||||
@ -126,7 +126,7 @@ def test_create_with_tag(loop, project, manager):
|
||||
with asyncio_patch("gns3server.compute.docker.Docker.list_images", return_value=[{"image": "ubuntu"}]) as mock_list_images:
|
||||
with asyncio_patch("gns3server.compute.docker.Docker.query", return_value=response) as mock:
|
||||
vm = DockerVM("test", str(uuid.uuid4()), project, manager, "ubuntu:16.04")
|
||||
loop.run_until_complete(asyncio.async(vm.create()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.create()))
|
||||
mock.assert_called_with("POST", "containers/create", data={
|
||||
"Tty": True,
|
||||
"OpenStdin": True,
|
||||
@ -168,7 +168,7 @@ def test_create_vnc(loop, project, manager):
|
||||
vm = DockerVM("test", str(uuid.uuid4()), project, manager, "ubuntu", console_type="vnc", console=5900)
|
||||
vm._start_vnc = MagicMock()
|
||||
vm._display = 42
|
||||
loop.run_until_complete(asyncio.async(vm.create()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.create()))
|
||||
mock.assert_called_with("POST", "containers/create", data={
|
||||
"Tty": True,
|
||||
"OpenStdin": True,
|
||||
@ -214,7 +214,7 @@ def test_create_with_extra_hosts(loop, project, manager):
|
||||
with asyncio_patch("gns3server.compute.docker.Docker.list_images", return_value=[{"image": "ubuntu"}]):
|
||||
with asyncio_patch("gns3server.compute.docker.Docker.query", return_value=response) as mock:
|
||||
vm = DockerVM("test", str(uuid.uuid4()), project, manager, "ubuntu", extra_hosts=extra_hosts)
|
||||
loop.run_until_complete(asyncio.async(vm.create()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.create()))
|
||||
called_kwargs = mock.call_args[1]
|
||||
assert "GNS3_EXTRA_HOSTS=199.199.199.1\ttest\n199.199.199.1\ttest2" in called_kwargs["data"]["Env"]
|
||||
assert vm._extra_hosts == extra_hosts
|
||||
@ -232,7 +232,7 @@ def test_create_with_extra_hosts_wrong_format(loop, project, manager):
|
||||
with asyncio_patch("gns3server.compute.docker.Docker.query", return_value=response):
|
||||
vm = DockerVM("test", str(uuid.uuid4()), project, manager, "ubuntu", extra_hosts=extra_hosts)
|
||||
with pytest.raises(DockerError):
|
||||
loop.run_until_complete(asyncio.async(vm.create()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.create()))
|
||||
|
||||
|
||||
def test_create_with_empty_extra_hosts(loop, project, manager):
|
||||
@ -246,7 +246,7 @@ def test_create_with_empty_extra_hosts(loop, project, manager):
|
||||
with asyncio_patch("gns3server.compute.docker.Docker.list_images", return_value=[{"image": "ubuntu"}]):
|
||||
with asyncio_patch("gns3server.compute.docker.Docker.query", return_value=response) as mock:
|
||||
vm = DockerVM("test", str(uuid.uuid4()), project, manager, "ubuntu", extra_hosts=extra_hosts)
|
||||
loop.run_until_complete(asyncio.async(vm.create()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.create()))
|
||||
called_kwargs = mock.call_args[1]
|
||||
assert len([ e for e in called_kwargs["data"]["Env"] if "GNS3_EXTRA_HOSTS" in e]) == 0
|
||||
|
||||
@ -266,7 +266,7 @@ def test_create_with_project_variables(loop, project, manager):
|
||||
with asyncio_patch("gns3server.compute.docker.Docker.list_images", return_value=[{"image": "ubuntu"}]):
|
||||
with asyncio_patch("gns3server.compute.docker.Docker.query", return_value=response) as mock:
|
||||
vm = DockerVM("test", str(uuid.uuid4()), project, manager, "ubuntu")
|
||||
loop.run_until_complete(asyncio.async(vm.create()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.create()))
|
||||
called_kwargs = mock.call_args[1]
|
||||
assert "VAR1=" in called_kwargs["data"]["Env"]
|
||||
assert "VAR2=VAL1" in called_kwargs["data"]["Env"]
|
||||
@ -284,7 +284,7 @@ def test_create_start_cmd(loop, project, manager):
|
||||
with asyncio_patch("gns3server.compute.docker.Docker.query", return_value=response) as mock:
|
||||
vm = DockerVM("test", str(uuid.uuid4()), project, manager, "ubuntu:latest")
|
||||
vm._start_command = "/bin/ls"
|
||||
loop.run_until_complete(asyncio.async(vm.create()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.create()))
|
||||
mock.assert_called_with("POST", "containers/create", data={
|
||||
"Tty": True,
|
||||
"OpenStdin": True,
|
||||
@ -328,7 +328,7 @@ def test_create_environment(loop, project, manager):
|
||||
with asyncio_patch("gns3server.compute.docker.Docker.query", return_value=response) as mock:
|
||||
vm = DockerVM("test", str(uuid.uuid4()), project, manager, "ubuntu")
|
||||
vm.environment = "YES=1\nNO=0\nGNS3_MAX_ETHERNET=eth2"
|
||||
loop.run_until_complete(asyncio.async(vm.create()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.create()))
|
||||
assert mock.call_args[1]['data']['Env'] == [
|
||||
"container=docker",
|
||||
"GNS3_MAX_ETHERNET=eth0",
|
||||
@ -352,7 +352,7 @@ def test_create_environment_with_last_new_line_character(loop, project, manager)
|
||||
with asyncio_patch("gns3server.compute.docker.Docker.query", return_value=response) as mock:
|
||||
vm = DockerVM("test", str(uuid.uuid4()), project, manager, "ubuntu")
|
||||
vm.environment = "YES=1\nNO=0\nGNS3_MAX_ETHERNET=eth2\n"
|
||||
loop.run_until_complete(asyncio.async(vm.create()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.create()))
|
||||
assert mock.call_args[1]['data']['Env'] == [
|
||||
"container=docker",
|
||||
"GNS3_MAX_ETHERNET=eth0",
|
||||
@ -385,7 +385,7 @@ def test_create_image_not_available(loop, project, manager):
|
||||
vm._get_image_information.side_effect = information
|
||||
with asyncio_patch("gns3server.compute.docker.DockerVM.pull_image", return_value=True) as mock_pull:
|
||||
with asyncio_patch("gns3server.compute.docker.Docker.query", return_value=response) as mock:
|
||||
loop.run_until_complete(asyncio.async(vm.create()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.create()))
|
||||
mock.assert_called_with("POST", "containers/create", data={
|
||||
"Tty": True,
|
||||
"OpenStdin": True,
|
||||
@ -431,17 +431,17 @@ def test_get_container_state(loop, vm):
|
||||
}
|
||||
}
|
||||
with asyncio_patch("gns3server.compute.docker.Docker.query", return_value=response) as mock:
|
||||
assert loop.run_until_complete(asyncio.async(vm._get_container_state())) == "running"
|
||||
assert loop.run_until_complete(asyncio.ensure_future(vm._get_container_state())) == "running"
|
||||
|
||||
response["State"]["Running"] = False
|
||||
response["State"]["Paused"] = True
|
||||
with asyncio_patch("gns3server.compute.docker.Docker.query", return_value=response) as mock:
|
||||
assert loop.run_until_complete(asyncio.async(vm._get_container_state())) == "paused"
|
||||
assert loop.run_until_complete(asyncio.ensure_future(vm._get_container_state())) == "paused"
|
||||
|
||||
response["State"]["Running"] = False
|
||||
response["State"]["Paused"] = False
|
||||
with asyncio_patch("gns3server.compute.docker.Docker.query", return_value=response) as mock:
|
||||
assert loop.run_until_complete(asyncio.async(vm._get_container_state())) == "exited"
|
||||
assert loop.run_until_complete(asyncio.ensure_future(vm._get_container_state())) == "exited"
|
||||
|
||||
|
||||
def test_is_running(loop, vm):
|
||||
@ -452,17 +452,17 @@ def test_is_running(loop, vm):
|
||||
}
|
||||
}
|
||||
with asyncio_patch("gns3server.compute.docker.Docker.query", return_value=response) as mock:
|
||||
assert loop.run_until_complete(asyncio.async(vm.is_running())) is False
|
||||
assert loop.run_until_complete(asyncio.ensure_future(vm.is_running())) is False
|
||||
|
||||
response["State"]["Running"] = True
|
||||
with asyncio_patch("gns3server.compute.docker.Docker.query", return_value=response) as mock:
|
||||
assert loop.run_until_complete(asyncio.async(vm.is_running())) is True
|
||||
assert loop.run_until_complete(asyncio.ensure_future(vm.is_running())) is True
|
||||
|
||||
|
||||
def test_pause(loop, vm):
|
||||
|
||||
with asyncio_patch("gns3server.compute.docker.Docker.query") as mock:
|
||||
loop.run_until_complete(asyncio.async(vm.pause()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.pause()))
|
||||
|
||||
mock.assert_called_with("POST", "containers/e90e34656842/pause")
|
||||
assert vm.status == "suspended"
|
||||
@ -471,7 +471,7 @@ def test_pause(loop, vm):
|
||||
def test_unpause(loop, vm):
|
||||
|
||||
with asyncio_patch("gns3server.compute.docker.Docker.query") as mock:
|
||||
loop.run_until_complete(asyncio.async(vm.unpause()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.unpause()))
|
||||
|
||||
mock.assert_called_with("POST", "containers/e90e34656842/unpause")
|
||||
|
||||
@ -491,10 +491,10 @@ def test_start(loop, vm, manager, free_console_port):
|
||||
vm._start_console = AsyncioMagicMock()
|
||||
|
||||
nio = manager.create_nio({"type": "nio_udp", "lport": free_console_port, "rport": free_console_port, "rhost": "127.0.0.1"})
|
||||
loop.run_until_complete(asyncio.async(vm.adapter_add_nio_binding(0, nio)))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.adapter_add_nio_binding(0, nio)))
|
||||
|
||||
with asyncio_patch("gns3server.compute.docker.Docker.query") as mock_query:
|
||||
loop.run_until_complete(asyncio.async(vm.start()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.start()))
|
||||
|
||||
mock_query.assert_called_with("POST", "containers/e90e34656842/start")
|
||||
vm._add_ubridge_connection.assert_called_once_with(nio, 0)
|
||||
@ -510,7 +510,7 @@ def test_start_namespace_failed(loop, vm, manager, free_console_port):
|
||||
vm.adapters = 1
|
||||
|
||||
nio = manager.create_nio({"type": "nio_udp", "lport": free_console_port, "rport": free_console_port, "rhost": "127.0.0.1"})
|
||||
loop.run_until_complete(asyncio.async(vm.adapter_add_nio_binding(0, nio)))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.adapter_add_nio_binding(0, nio)))
|
||||
|
||||
with asyncio_patch("gns3server.compute.docker.DockerVM._get_container_state", return_value="stopped"):
|
||||
with asyncio_patch("gns3server.compute.docker.Docker.query") as mock_query:
|
||||
@ -520,7 +520,7 @@ def test_start_namespace_failed(loop, vm, manager, free_console_port):
|
||||
with asyncio_patch("gns3server.compute.docker.DockerVM._get_log", return_value='Hello not available') as mock_log:
|
||||
|
||||
with pytest.raises(DockerError):
|
||||
loop.run_until_complete(asyncio.async(vm.start()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.start()))
|
||||
|
||||
mock_query.assert_any_call("POST", "containers/e90e34656842/start")
|
||||
mock_add_ubridge_connection.assert_called_once_with(nio, 0)
|
||||
@ -542,7 +542,7 @@ def test_start_without_nio(loop, vm, manager, free_console_port):
|
||||
with asyncio_patch("gns3server.compute.docker.DockerVM._get_namespace", return_value=42) as mock_namespace:
|
||||
with asyncio_patch("gns3server.compute.docker.DockerVM._add_ubridge_connection") as mock_add_ubridge_connection:
|
||||
with asyncio_patch("gns3server.compute.docker.DockerVM._start_console") as mock_start_console:
|
||||
loop.run_until_complete(asyncio.async(vm.start()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.start()))
|
||||
|
||||
mock_query.assert_called_with("POST", "containers/e90e34656842/start")
|
||||
assert mock_add_ubridge_connection.called
|
||||
@ -555,7 +555,7 @@ def test_start_unpause(loop, vm, manager, free_console_port):
|
||||
|
||||
with asyncio_patch("gns3server.compute.docker.DockerVM._get_container_state", return_value="paused"):
|
||||
with asyncio_patch("gns3server.compute.docker.DockerVM.unpause", return_value="paused") as mock:
|
||||
loop.run_until_complete(asyncio.async(vm.start()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.start()))
|
||||
assert mock.called
|
||||
assert vm.status == "started"
|
||||
|
||||
@ -563,7 +563,7 @@ def test_start_unpause(loop, vm, manager, free_console_port):
|
||||
def test_restart(loop, vm):
|
||||
|
||||
with asyncio_patch("gns3server.compute.docker.Docker.query") as mock:
|
||||
loop.run_until_complete(asyncio.async(vm.restart()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.restart()))
|
||||
|
||||
mock.assert_called_with("POST", "containers/e90e34656842/restart")
|
||||
|
||||
@ -576,7 +576,7 @@ def test_stop(loop, vm):
|
||||
|
||||
with asyncio_patch("gns3server.compute.docker.DockerVM._get_container_state", return_value="running"):
|
||||
with asyncio_patch("gns3server.compute.docker.Docker.query") as mock_query:
|
||||
loop.run_until_complete(asyncio.async(vm.stop()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.stop()))
|
||||
mock_query.assert_called_with("POST", "containers/e90e34656842/stop", params={"t": 5})
|
||||
assert mock.stop.called
|
||||
assert vm._ubridge_hypervisor is None
|
||||
@ -588,7 +588,7 @@ def test_stop_paused_container(loop, vm):
|
||||
with asyncio_patch("gns3server.compute.docker.DockerVM._get_container_state", return_value="paused"):
|
||||
with asyncio_patch("gns3server.compute.docker.DockerVM.unpause") as mock_unpause:
|
||||
with asyncio_patch("gns3server.compute.docker.Docker.query") as mock_query:
|
||||
loop.run_until_complete(asyncio.async(vm.stop()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.stop()))
|
||||
mock_query.assert_called_with("POST", "containers/e90e34656842/stop", params={"t": 5})
|
||||
assert mock_unpause.called
|
||||
|
||||
@ -606,7 +606,7 @@ def test_update(loop, vm):
|
||||
with asyncio_patch("gns3server.compute.docker.Docker.list_images", return_value=[{"image": "ubuntu"}]) as mock_list_images:
|
||||
with asyncio_patch("gns3server.compute.docker.DockerVM._get_container_state", return_value="stopped"):
|
||||
with asyncio_patch("gns3server.compute.docker.Docker.query", return_value=response) as mock_query:
|
||||
loop.run_until_complete(asyncio.async(vm.update()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.update()))
|
||||
|
||||
mock_query.assert_any_call("DELETE", "containers/e90e34656842", params={"force": 1, "v": 1})
|
||||
mock_query.assert_any_call("POST", "containers/create", data={
|
||||
@ -656,7 +656,7 @@ def test_update_vnc(loop, vm):
|
||||
with asyncio_patch("gns3server.compute.docker.Docker.list_images", return_value=[{"image": "ubuntu"}]) as mock_list_images:
|
||||
with asyncio_patch("gns3server.compute.docker.DockerVM._get_container_state", return_value="stopped"):
|
||||
with asyncio_patch("gns3server.compute.docker.Docker.query", return_value=response) as mock_query:
|
||||
loop.run_until_complete(asyncio.async(vm.update()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.update()))
|
||||
|
||||
assert vm.console == original_console
|
||||
assert vm.aux == original_aux
|
||||
@ -675,7 +675,7 @@ def test_update_running(loop, vm):
|
||||
with asyncio_patch("gns3server.compute.docker.Docker.list_images", return_value=[{"image": "ubuntu"}]) as mock_list_images:
|
||||
with asyncio_patch("gns3server.compute.docker.DockerVM._get_container_state", return_value="running"):
|
||||
with asyncio_patch("gns3server.compute.docker.Docker.query", return_value=response) as mock_query:
|
||||
loop.run_until_complete(asyncio.async(vm.update()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.update()))
|
||||
|
||||
mock_query.assert_any_call("DELETE", "containers/e90e34656842", params={"force": 1, "v": 1})
|
||||
mock_query.assert_any_call("POST", "containers/create", data={
|
||||
@ -713,7 +713,7 @@ def test_delete(loop, vm):
|
||||
|
||||
with asyncio_patch("gns3server.compute.docker.DockerVM._get_container_state", return_value="stopped"):
|
||||
with asyncio_patch("gns3server.compute.docker.Docker.query") as mock_query:
|
||||
loop.run_until_complete(asyncio.async(vm.delete()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.delete()))
|
||||
mock_query.assert_called_with("DELETE", "containers/e90e34656842", params={"force": 1, "v": 1})
|
||||
|
||||
|
||||
@ -723,11 +723,11 @@ def test_close(loop, vm, port_manager):
|
||||
"rport": 4343,
|
||||
"rhost": "127.0.0.1"}
|
||||
nio = vm.manager.create_nio(nio)
|
||||
loop.run_until_complete(asyncio.async(vm.adapter_add_nio_binding(0, nio)))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.adapter_add_nio_binding(0, nio)))
|
||||
|
||||
with asyncio_patch("gns3server.compute.docker.DockerVM._get_container_state", return_value="stopped"):
|
||||
with asyncio_patch("gns3server.compute.docker.Docker.query") as mock_query:
|
||||
loop.run_until_complete(asyncio.async(vm.close()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.close()))
|
||||
mock_query.assert_called_with("DELETE", "containers/e90e34656842", params={"force": 1, "v": 1})
|
||||
|
||||
assert vm._closed is True
|
||||
@ -742,7 +742,7 @@ def test_close_vnc(loop, vm, port_manager):
|
||||
|
||||
with asyncio_patch("gns3server.compute.docker.DockerVM._get_container_state", return_value="stopped"):
|
||||
with asyncio_patch("gns3server.compute.docker.Docker.query") as mock_query:
|
||||
loop.run_until_complete(asyncio.async(vm.close()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.close()))
|
||||
mock_query.assert_called_with("DELETE", "containers/e90e34656842", params={"force": 1, "v": 1})
|
||||
|
||||
assert vm._closed is True
|
||||
@ -756,7 +756,7 @@ def test_get_namespace(loop, vm):
|
||||
}
|
||||
}
|
||||
with asyncio_patch("gns3server.compute.docker.Docker.query", return_value=response) as mock_query:
|
||||
assert loop.run_until_complete(asyncio.async(vm._get_namespace())) == 42
|
||||
assert loop.run_until_complete(asyncio.ensure_future(vm._get_namespace())) == 42
|
||||
mock_query.assert_called_with("GET", "containers/e90e34656842/json")
|
||||
|
||||
|
||||
@ -771,7 +771,7 @@ def test_add_ubridge_connection(loop, vm):
|
||||
vm._ubridge_hypervisor = MagicMock()
|
||||
vm._namespace = 42
|
||||
|
||||
loop.run_until_complete(asyncio.async(vm._add_ubridge_connection(nio, 0)))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm._add_ubridge_connection(nio, 0)))
|
||||
|
||||
calls = [
|
||||
call.send('bridge create bridge0'),
|
||||
@ -792,7 +792,7 @@ def test_add_ubridge_connection_none_nio(loop, vm):
|
||||
vm._ubridge_hypervisor = MagicMock()
|
||||
vm._namespace = 42
|
||||
|
||||
loop.run_until_complete(asyncio.async(vm._add_ubridge_connection(nio, 0)))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm._add_ubridge_connection(nio, 0)))
|
||||
|
||||
calls = [
|
||||
call.send('bridge create bridge0'),
|
||||
@ -813,7 +813,7 @@ def test_add_ubridge_connection_invalid_adapter_number(loop, vm):
|
||||
"rhost": "127.0.0.1"}
|
||||
nio = vm.manager.create_nio(nio)
|
||||
with pytest.raises(DockerError):
|
||||
loop.run_until_complete(asyncio.async(vm._add_ubridge_connection(nio, 12)))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm._add_ubridge_connection(nio, 12)))
|
||||
|
||||
|
||||
def test_add_ubridge_connection_no_free_interface(loop, vm):
|
||||
@ -829,7 +829,7 @@ def test_add_ubridge_connection_no_free_interface(loop, vm):
|
||||
interfaces = ["tap-gns3-e{}".format(index) for index in range(4096)]
|
||||
|
||||
with patch("psutil.net_if_addrs", return_value=interfaces):
|
||||
loop.run_until_complete(asyncio.async(vm._add_ubridge_connection(nio, 0)))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm._add_ubridge_connection(nio, 0)))
|
||||
|
||||
|
||||
def test_adapter_add_nio_binding(vm, loop):
|
||||
@ -838,7 +838,7 @@ def test_adapter_add_nio_binding(vm, loop):
|
||||
"rport": 4343,
|
||||
"rhost": "127.0.0.1"}
|
||||
nio = vm.manager.create_nio(nio)
|
||||
loop.run_until_complete(asyncio.async(vm.adapter_add_nio_binding(0, nio)))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.adapter_add_nio_binding(0, nio)))
|
||||
assert vm._ethernet_adapters[0].get_nio(0) == nio
|
||||
|
||||
|
||||
@ -853,9 +853,9 @@ def test_adapter_udpate_nio_binding(vm, loop):
|
||||
"rhost": "127.0.0.1"}
|
||||
nio = vm.manager.create_nio(nio)
|
||||
with asyncio_patch("gns3server.compute.docker.DockerVM._get_container_state", return_value="running"):
|
||||
loop.run_until_complete(asyncio.async(vm.adapter_add_nio_binding(0, nio)))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.adapter_add_nio_binding(0, nio)))
|
||||
|
||||
loop.run_until_complete(asyncio.async(vm.adapter_update_nio_binding(0, nio)))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.adapter_update_nio_binding(0, nio)))
|
||||
assert vm._ubridge_apply_filters.called
|
||||
|
||||
|
||||
@ -867,9 +867,9 @@ def test_adapter_udpate_nio_binding_bridge_not_started(vm, loop):
|
||||
"rhost": "127.0.0.1"}
|
||||
nio = vm.manager.create_nio(nio)
|
||||
with asyncio_patch("gns3server.compute.docker.DockerVM._get_container_state", return_value="running"):
|
||||
loop.run_until_complete(asyncio.async(vm.adapter_add_nio_binding(0, nio)))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.adapter_add_nio_binding(0, nio)))
|
||||
|
||||
loop.run_until_complete(asyncio.async(vm.adapter_update_nio_binding(0, nio)))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.adapter_update_nio_binding(0, nio)))
|
||||
assert vm._ubridge_apply_filters.called is False
|
||||
|
||||
|
||||
@ -880,7 +880,7 @@ def test_adapter_add_nio_binding_invalid_adapter(vm, loop):
|
||||
"rhost": "127.0.0.1"}
|
||||
nio = vm.manager.create_nio(nio)
|
||||
with pytest.raises(DockerError):
|
||||
loop.run_until_complete(asyncio.async(vm.adapter_add_nio_binding(12, nio)))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.adapter_add_nio_binding(12, nio)))
|
||||
|
||||
|
||||
def test_adapter_remove_nio_binding(vm, loop):
|
||||
@ -892,10 +892,10 @@ def test_adapter_remove_nio_binding(vm, loop):
|
||||
"rport": 4343,
|
||||
"rhost": "127.0.0.1"}
|
||||
nio = vm.manager.create_nio(nio)
|
||||
loop.run_until_complete(asyncio.async(vm.adapter_add_nio_binding(0, nio)))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.adapter_add_nio_binding(0, nio)))
|
||||
|
||||
with asyncio_patch("gns3server.compute.docker.DockerVM._ubridge_send") as delete_ubridge_mock:
|
||||
loop.run_until_complete(asyncio.async(vm.adapter_remove_nio_binding(0)))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.adapter_remove_nio_binding(0)))
|
||||
assert vm._ethernet_adapters[0].get_nio(0) is None
|
||||
delete_ubridge_mock.assert_any_call('bridge stop bridge0')
|
||||
delete_ubridge_mock.assert_any_call('bridge remove_nio_udp bridge0 4242 127.0.0.1 4343')
|
||||
@ -903,15 +903,15 @@ def test_adapter_remove_nio_binding(vm, loop):
|
||||
|
||||
def test_adapter_remove_nio_binding_invalid_adapter(vm, loop):
|
||||
with pytest.raises(DockerError):
|
||||
loop.run_until_complete(asyncio.async(vm.adapter_remove_nio_binding(12)))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.adapter_remove_nio_binding(12)))
|
||||
|
||||
|
||||
def test_start_capture(vm, tmpdir, manager, free_console_port, loop):
|
||||
|
||||
output_file = str(tmpdir / "test.pcap")
|
||||
nio = manager.create_nio({"type": "nio_udp", "lport": free_console_port, "rport": free_console_port, "rhost": "127.0.0.1"})
|
||||
loop.run_until_complete(asyncio.async(vm.adapter_add_nio_binding(0, nio)))
|
||||
loop.run_until_complete(asyncio.async(vm.start_capture(0, output_file)))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.adapter_add_nio_binding(0, nio)))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.start_capture(0, output_file)))
|
||||
assert vm._ethernet_adapters[0].get_nio(0).capturing
|
||||
|
||||
|
||||
@ -919,10 +919,10 @@ def test_stop_capture(vm, tmpdir, manager, free_console_port, loop):
|
||||
|
||||
output_file = str(tmpdir / "test.pcap")
|
||||
nio = manager.create_nio({"type": "nio_udp", "lport": free_console_port, "rport": free_console_port, "rhost": "127.0.0.1"})
|
||||
loop.run_until_complete(asyncio.async(vm.adapter_add_nio_binding(0, nio)))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.adapter_add_nio_binding(0, nio)))
|
||||
loop.run_until_complete(vm.start_capture(0, output_file))
|
||||
assert vm._ethernet_adapters[0].get_nio(0).capturing
|
||||
loop.run_until_complete(asyncio.async(vm.stop_capture(0)))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.stop_capture(0)))
|
||||
assert vm._ethernet_adapters[0].get_nio(0).capturing is False
|
||||
|
||||
|
||||
@ -935,7 +935,7 @@ def test_get_log(loop, vm):
|
||||
mock_query.read = read
|
||||
|
||||
with asyncio_patch("gns3server.compute.docker.Docker.http_query", return_value=mock_query) as mock:
|
||||
images = loop.run_until_complete(asyncio.async(vm._get_log()))
|
||||
images = loop.run_until_complete(asyncio.ensure_future(vm._get_log()))
|
||||
mock.assert_called_with("GET", "containers/e90e34656842/logs", params={"stderr": 1, "stdout": 1}, data={})
|
||||
|
||||
|
||||
@ -944,7 +944,7 @@ def test_get_image_informations(project, manager, loop):
|
||||
}
|
||||
with asyncio_patch("gns3server.compute.docker.Docker.query", return_value=response) as mock:
|
||||
vm = DockerVM("test", str(uuid.uuid4()), project, manager, "ubuntu")
|
||||
loop.run_until_complete(asyncio.async(vm._get_image_information()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm._get_image_information()))
|
||||
mock.assert_called_with("GET", "images/ubuntu:latest/json")
|
||||
|
||||
|
||||
@ -974,7 +974,7 @@ def test_start_vnc(vm, loop):
|
||||
with patch("shutil.which", return_value="/bin/x"):
|
||||
with asyncio_patch("gns3server.compute.docker.docker_vm.wait_for_file_creation") as mock_wait:
|
||||
with asyncio_patch("asyncio.create_subprocess_exec") as mock_exec:
|
||||
loop.run_until_complete(asyncio.async(vm._start_vnc()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm._start_vnc()))
|
||||
assert vm._display is not None
|
||||
mock_exec.assert_any_call("Xvfb", "-nolisten", "tcp", ":{}".format(vm._display), "-screen", "0", "1280x1024x16")
|
||||
mock_exec.assert_any_call("x11vnc", "-forever", "-nopw", "-shared", "-geometry", "1280x1024", "-display", "WAIT:{}".format(vm._display), "-rfbport", str(vm.console), "-rfbportv6", str(vm.console), "-noncache", "-listen", "127.0.0.1")
|
||||
@ -983,13 +983,13 @@ def test_start_vnc(vm, loop):
|
||||
|
||||
def test_start_vnc_xvfb_missing(vm, loop):
|
||||
with pytest.raises(DockerError):
|
||||
loop.run_until_complete(asyncio.async(vm._start_vnc()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm._start_vnc()))
|
||||
|
||||
|
||||
def test_start_aux(vm, loop):
|
||||
|
||||
with asyncio_patch("asyncio.subprocess.create_subprocess_exec", return_value=MagicMock()) as mock_exec:
|
||||
loop.run_until_complete(asyncio.async(vm._start_aux()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm._start_aux()))
|
||||
mock_exec.assert_called_with('docker', 'exec', '-i', 'e90e34656842', '/gns3/bin/busybox', 'script', '-qfc', 'while true; do TERM=vt100 /gns3/bin/busybox sh; done', '/dev/null', stderr=asyncio.subprocess.STDOUT, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE)
|
||||
|
||||
|
||||
@ -1051,5 +1051,5 @@ def test_read_console_output_with_binary_mode(vm, loop):
|
||||
output_stream = MagicMock()
|
||||
|
||||
with asyncio_patch('gns3server.compute.docker.docker_vm.DockerVM.stop'):
|
||||
loop.run_until_complete(asyncio.async(vm._read_console_output(input_stream, output_stream)))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm._read_console_output(input_stream, output_stream)))
|
||||
output_stream.feed_data.assert_called_once_with(b"test")
|
||||
|
@ -70,6 +70,6 @@ def test_router_invalid_dynamips_path(project, manager, loop):
|
||||
|
||||
with pytest.raises(DynamipsError):
|
||||
router = Router("test", "00010203-0405-0607-0809-0a0b0c0d0e0e", project, manager)
|
||||
loop.run_until_complete(asyncio.async(router.create()))
|
||||
loop.run_until_complete(asyncio.ensure_future(router.create()))
|
||||
assert router.name == "test"
|
||||
assert router.id == "00010203-0405-0607-0809-0a0b0c0d0e0e"
|
||||
|
@ -101,7 +101,7 @@ def test_start(loop, vm):
|
||||
|
||||
with asyncio_patch("asyncio.create_subprocess_exec", return_value=mock_process) as mock_exec:
|
||||
mock_process.returncode = None
|
||||
loop.run_until_complete(asyncio.async(vm.start()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.start()))
|
||||
assert vm.is_running()
|
||||
assert vm.command_line == ' '.join(mock_exec.call_args[0])
|
||||
|
||||
@ -129,7 +129,7 @@ def test_start_with_iourc(loop, vm, tmpdir):
|
||||
with patch("gns3server.config.Config.get_section_config", return_value={"iourc_path": fake_file}):
|
||||
with asyncio_patch("asyncio.create_subprocess_exec", return_value=mock_process) as exec_mock:
|
||||
mock_process.returncode = None
|
||||
loop.run_until_complete(asyncio.async(vm.start()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.start()))
|
||||
assert vm.is_running()
|
||||
arsgs, kwargs = exec_mock.call_args
|
||||
assert kwargs["env"]["IOURC"] == fake_file
|
||||
@ -167,10 +167,10 @@ def test_stop(loop, vm):
|
||||
|
||||
with asyncio_patch("asyncio.create_subprocess_exec", return_value=process):
|
||||
with asyncio_patch("gns3server.utils.asyncio.wait_for_process_termination"):
|
||||
loop.run_until_complete(asyncio.async(vm.start()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.start()))
|
||||
process.returncode = None
|
||||
assert vm.is_running()
|
||||
loop.run_until_complete(asyncio.async(vm.stop()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.stop()))
|
||||
assert vm.is_running() is False
|
||||
process.terminate.assert_called_with()
|
||||
|
||||
@ -192,9 +192,9 @@ def test_reload(loop, vm, fake_iou_bin):
|
||||
|
||||
with asyncio_patch("asyncio.create_subprocess_exec", return_value=process):
|
||||
with asyncio_patch("gns3server.utils.asyncio.wait_for_process_termination"):
|
||||
loop.run_until_complete(asyncio.async(vm.start()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.start()))
|
||||
assert vm.is_running()
|
||||
loop.run_until_complete(asyncio.async(vm.reload()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.reload()))
|
||||
assert vm.is_running() is True
|
||||
process.terminate.assert_called_with()
|
||||
|
||||
@ -204,7 +204,7 @@ def test_close(vm, port_manager, loop):
|
||||
with asyncio_patch("asyncio.create_subprocess_exec", return_value=MagicMock()):
|
||||
vm.start()
|
||||
port = vm.console
|
||||
loop.run_until_complete(asyncio.async(vm.close()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.close()))
|
||||
# Raise an exception if the port is not free
|
||||
port_manager.reserve_tcp_port(port, vm.project)
|
||||
assert vm.is_running() is False
|
||||
@ -249,7 +249,7 @@ def test_create_netmap_config(vm):
|
||||
|
||||
def test_build_command(vm, loop):
|
||||
|
||||
assert loop.run_until_complete(asyncio.async(vm._build_command())) == [vm.path, str(vm.application_id)]
|
||||
assert loop.run_until_complete(asyncio.ensure_future(vm._build_command())) == [vm.path, str(vm.application_id)]
|
||||
|
||||
|
||||
def test_get_startup_config(vm):
|
||||
@ -311,11 +311,11 @@ def test_library_check(loop, vm):
|
||||
|
||||
with asyncio_patch("gns3server.utils.asyncio.subprocess_check_output", return_value="") as mock:
|
||||
|
||||
loop.run_until_complete(asyncio.async(vm._library_check()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm._library_check()))
|
||||
|
||||
with asyncio_patch("gns3server.utils.asyncio.subprocess_check_output", return_value="libssl => not found") as mock:
|
||||
with pytest.raises(IOUError):
|
||||
loop.run_until_complete(asyncio.async(vm._library_check()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm._library_check()))
|
||||
|
||||
|
||||
def test_enable_l1_keepalives(loop, vm):
|
||||
@ -323,14 +323,14 @@ def test_enable_l1_keepalives(loop, vm):
|
||||
with asyncio_patch("gns3server.utils.asyncio.subprocess_check_output", return_value="***************************************************************\n\n-l Enable Layer 1 keepalive messages\n-u <n> UDP port base for distributed networks\n") as mock:
|
||||
|
||||
command = ["test"]
|
||||
loop.run_until_complete(asyncio.async(vm._enable_l1_keepalives(command)))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm._enable_l1_keepalives(command)))
|
||||
assert command == ["test", "-l"]
|
||||
|
||||
with asyncio_patch("gns3server.utils.asyncio.subprocess_check_output", return_value="***************************************************************\n\n-u <n> UDP port base for distributed networks\n") as mock:
|
||||
|
||||
command = ["test"]
|
||||
with pytest.raises(IOUError):
|
||||
loop.run_until_complete(asyncio.async(vm._enable_l1_keepalives(command)))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm._enable_l1_keepalives(command)))
|
||||
assert command == ["test"]
|
||||
|
||||
|
||||
@ -338,8 +338,8 @@ def test_start_capture(vm, tmpdir, manager, free_console_port, loop):
|
||||
|
||||
output_file = str(tmpdir / "test.pcap")
|
||||
nio = manager.create_nio({"type": "nio_udp", "lport": free_console_port, "rport": free_console_port, "rhost": "127.0.0.1"})
|
||||
loop.run_until_complete(asyncio.async(vm.adapter_add_nio_binding(0, 0, nio)))
|
||||
loop.run_until_complete(asyncio.async(vm.start_capture(0, 0, output_file)))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.adapter_add_nio_binding(0, 0, nio)))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.start_capture(0, 0, output_file)))
|
||||
assert vm._adapters[0].get_nio(0).capturing
|
||||
|
||||
|
||||
@ -347,10 +347,10 @@ def test_stop_capture(vm, tmpdir, manager, free_console_port, loop):
|
||||
|
||||
output_file = str(tmpdir / "test.pcap")
|
||||
nio = manager.create_nio({"type": "nio_udp", "lport": free_console_port, "rport": free_console_port, "rhost": "127.0.0.1"})
|
||||
loop.run_until_complete(asyncio.async(vm.adapter_add_nio_binding(0, 0, nio)))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.adapter_add_nio_binding(0, 0, nio)))
|
||||
loop.run_until_complete(vm.start_capture(0, 0, output_file))
|
||||
assert vm._adapters[0].get_nio(0).capturing
|
||||
loop.run_until_complete(asyncio.async(vm.stop_capture(0, 0)))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.stop_capture(0, 0)))
|
||||
assert vm._adapters[0].get_nio(0).capturing is False
|
||||
|
||||
|
||||
@ -363,42 +363,42 @@ def test_invalid_iou_file(loop, vm, iourc_file):
|
||||
|
||||
hostname = socket.gethostname()
|
||||
|
||||
loop.run_until_complete(asyncio.async(vm._check_iou_licence()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm._check_iou_licence()))
|
||||
|
||||
# Missing ;
|
||||
with pytest.raises(IOUError):
|
||||
with open(iourc_file, "w+") as f:
|
||||
f.write("[license]\n{} = aaaaaaaaaaaaaaaa".format(hostname))
|
||||
loop.run_until_complete(asyncio.async(vm._check_iou_licence()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm._check_iou_licence()))
|
||||
|
||||
# Key too short
|
||||
with pytest.raises(IOUError):
|
||||
with open(iourc_file, "w+") as f:
|
||||
f.write("[license]\n{} = aaaaaaaaaaaaaa;".format(hostname))
|
||||
loop.run_until_complete(asyncio.async(vm._check_iou_licence()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm._check_iou_licence()))
|
||||
|
||||
# Invalid hostname
|
||||
with pytest.raises(IOUError):
|
||||
with open(iourc_file, "w+") as f:
|
||||
f.write("[license]\nbla = aaaaaaaaaaaaaa;")
|
||||
loop.run_until_complete(asyncio.async(vm._check_iou_licence()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm._check_iou_licence()))
|
||||
|
||||
# Missing licence section
|
||||
with pytest.raises(IOUError):
|
||||
with open(iourc_file, "w+") as f:
|
||||
f.write("[licensetest]\n{} = aaaaaaaaaaaaaaaa;")
|
||||
loop.run_until_complete(asyncio.async(vm._check_iou_licence()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm._check_iou_licence()))
|
||||
|
||||
# Broken config file
|
||||
with pytest.raises(IOUError):
|
||||
with open(iourc_file, "w+") as f:
|
||||
f.write("[")
|
||||
loop.run_until_complete(asyncio.async(vm._check_iou_licence()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm._check_iou_licence()))
|
||||
|
||||
# Missing file
|
||||
with pytest.raises(IOUError):
|
||||
os.remove(iourc_file)
|
||||
loop.run_until_complete(asyncio.async(vm._check_iou_licence()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm._check_iou_licence()))
|
||||
|
||||
|
||||
def test_iourc_content(vm):
|
||||
|
@ -69,5 +69,5 @@ def test_rebase(tmpdir, loop):
|
||||
qcow2 = Qcow2(str(tmpdir / "linked.qcow2"))
|
||||
assert qcow2.version == 3
|
||||
assert qcow2.backing_file == "empty8G.qcow2"
|
||||
loop.run_until_complete(asyncio.async(qcow2.rebase(qemu_img(), str(tmpdir / "empty16G.qcow2"))))
|
||||
loop.run_until_complete(asyncio.ensure_future(qcow2.rebase(qemu_img(), str(tmpdir / "empty16G.qcow2"))))
|
||||
assert qcow2.backing_file == str(tmpdir / "empty16G.qcow2")
|
||||
|
@ -41,7 +41,7 @@ def fake_qemu_img_binary(tmpdir):
|
||||
def test_get_qemu_version(loop):
|
||||
|
||||
with asyncio_patch("gns3server.compute.qemu.subprocess_check_output", return_value="QEMU emulator version 2.2.0, Copyright (c) 2003-2008 Fabrice Bellard") as mock:
|
||||
version = loop.run_until_complete(asyncio.async(Qemu.get_qemu_version("/tmp/qemu-test")))
|
||||
version = loop.run_until_complete(asyncio.ensure_future(Qemu.get_qemu_version("/tmp/qemu-test")))
|
||||
if sys.platform.startswith("win"):
|
||||
assert version == ""
|
||||
else:
|
||||
@ -64,7 +64,7 @@ def test_binary_list(loop):
|
||||
else:
|
||||
version = "2.2.0"
|
||||
|
||||
qemus = loop.run_until_complete(asyncio.async(Qemu.binary_list()))
|
||||
qemus = loop.run_until_complete(asyncio.ensure_future(Qemu.binary_list()))
|
||||
|
||||
assert {"path": os.path.join(os.environ["PATH"], "qemu-system-x86"), "version": version} in qemus
|
||||
assert {"path": os.path.join(os.environ["PATH"], "qemu-kvm"), "version": version} in qemus
|
||||
@ -72,14 +72,14 @@ def test_binary_list(loop):
|
||||
assert {"path": os.path.join(os.environ["PATH"], "hello"), "version": version} not in qemus
|
||||
assert {"path": os.path.join(os.environ["PATH"], "qemu-system-x86_64-spice"), "version": version} not in qemus
|
||||
|
||||
qemus = loop.run_until_complete(asyncio.async(Qemu.binary_list(["x86"])))
|
||||
qemus = loop.run_until_complete(asyncio.ensure_future(Qemu.binary_list(["x86"])))
|
||||
|
||||
assert {"path": os.path.join(os.environ["PATH"], "qemu-system-x86"), "version": version} in qemus
|
||||
assert {"path": os.path.join(os.environ["PATH"], "qemu-kvm"), "version": version} not in qemus
|
||||
assert {"path": os.path.join(os.environ["PATH"], "qemu-system-x42"), "version": version} not in qemus
|
||||
assert {"path": os.path.join(os.environ["PATH"], "hello"), "version": version} not in qemus
|
||||
|
||||
qemus = loop.run_until_complete(asyncio.async(Qemu.binary_list(["x86", "x42"])))
|
||||
qemus = loop.run_until_complete(asyncio.ensure_future(Qemu.binary_list(["x86", "x42"])))
|
||||
|
||||
assert {"path": os.path.join(os.environ["PATH"], "qemu-system-x86"), "version": version} in qemus
|
||||
assert {"path": os.path.join(os.environ["PATH"], "qemu-kvm"), "version": version} not in qemus
|
||||
@ -98,7 +98,7 @@ def test_img_binary_list(loop):
|
||||
os.chmod(path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
|
||||
|
||||
with asyncio_patch("gns3server.compute.qemu.subprocess_check_output", return_value="qemu-img version 2.2.0, Copyright (c) 2004-2008 Fabrice Bellard") as mock:
|
||||
qemus = loop.run_until_complete(asyncio.async(Qemu.img_binary_list()))
|
||||
qemus = loop.run_until_complete(asyncio.ensure_future(Qemu.img_binary_list()))
|
||||
|
||||
version = "2.2.0"
|
||||
|
||||
@ -125,7 +125,7 @@ def test_create_image_abs_path(loop, tmpdir, fake_qemu_img_binary):
|
||||
"size": 100
|
||||
}
|
||||
with asyncio_patch("asyncio.create_subprocess_exec", return_value=MagicMock()) as process:
|
||||
loop.run_until_complete(asyncio.async(Qemu.instance().create_disk(fake_qemu_img_binary, str(tmpdir / "hda.qcow2"), options)))
|
||||
loop.run_until_complete(asyncio.ensure_future(Qemu.instance().create_disk(fake_qemu_img_binary, str(tmpdir / "hda.qcow2"), options)))
|
||||
args, kwargs = process.call_args
|
||||
assert args == (
|
||||
fake_qemu_img_binary,
|
||||
@ -152,7 +152,7 @@ def test_create_image_relative_path(loop, tmpdir, fake_qemu_img_binary):
|
||||
}
|
||||
with asyncio_patch("asyncio.create_subprocess_exec", return_value=MagicMock()) as process:
|
||||
with patch("gns3server.compute.qemu.Qemu.get_images_directory", return_value=str(tmpdir)):
|
||||
loop.run_until_complete(asyncio.async(Qemu.instance().create_disk(fake_qemu_img_binary, "hda.qcow2", options)))
|
||||
loop.run_until_complete(asyncio.ensure_future(Qemu.instance().create_disk(fake_qemu_img_binary, "hda.qcow2", options)))
|
||||
args, kwargs = process.call_args
|
||||
assert args == (
|
||||
fake_qemu_img_binary,
|
||||
@ -174,7 +174,7 @@ def test_create_image_exist(loop, tmpdir, fake_qemu_img_binary):
|
||||
with asyncio_patch("asyncio.create_subprocess_exec", return_value=MagicMock()) as process:
|
||||
with patch("gns3server.compute.qemu.Qemu.get_images_directory", return_value=str(tmpdir)):
|
||||
with pytest.raises(QemuError):
|
||||
loop.run_until_complete(asyncio.async(Qemu.instance().create_disk(fake_qemu_img_binary, "hda.qcow2", options)))
|
||||
loop.run_until_complete(asyncio.ensure_future(Qemu.instance().create_disk(fake_qemu_img_binary, "hda.qcow2", options)))
|
||||
assert not process.called
|
||||
|
||||
|
||||
@ -193,7 +193,7 @@ def test_create_image_with_not_supported_characters_by_filesystem(loop, tmpdir,
|
||||
patch("os.makedirs"):
|
||||
|
||||
with pytest.raises(QemuError):
|
||||
loop.run_until_complete(asyncio.async(Qemu.instance().create_disk(
|
||||
loop.run_until_complete(asyncio.ensure_future(Qemu.instance().create_disk(
|
||||
fake_qemu_img_binary, "hda.qcow2", options)))
|
||||
assert not process.called
|
||||
|
||||
@ -201,12 +201,12 @@ def test_create_image_with_not_supported_characters_by_filesystem(loop, tmpdir,
|
||||
def test_get_kvm_archs_kvm_ok(loop):
|
||||
|
||||
with patch("os.path.exists", return_value=True):
|
||||
archs = loop.run_until_complete(asyncio.async(Qemu.get_kvm_archs()))
|
||||
archs = loop.run_until_complete(asyncio.ensure_future(Qemu.get_kvm_archs()))
|
||||
if platform.machine() == 'x86_64':
|
||||
assert archs == ['x86_64', 'i386']
|
||||
else:
|
||||
assert archs == [platform.machine()]
|
||||
|
||||
with patch("os.path.exists", return_value=False):
|
||||
archs = loop.run_until_complete(asyncio.async(Qemu.get_kvm_archs()))
|
||||
archs = loop.run_until_complete(asyncio.ensure_future(Qemu.get_kvm_archs()))
|
||||
assert archs == []
|
||||
|
@ -118,7 +118,7 @@ def test_is_running(vm, running_subprocess_mock):
|
||||
def test_start(loop, vm, running_subprocess_mock):
|
||||
with asyncio_patch("gns3server.compute.qemu.QemuVM.start_wrap_console"):
|
||||
with asyncio_patch("asyncio.create_subprocess_exec", return_value=running_subprocess_mock) as mock:
|
||||
loop.run_until_complete(asyncio.async(vm.start()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.start()))
|
||||
assert vm.is_running()
|
||||
assert vm.command_line == ' '.join(mock.call_args[0])
|
||||
|
||||
@ -135,9 +135,9 @@ def test_stop(loop, vm, running_subprocess_mock):
|
||||
with asyncio_patch("asyncio.create_subprocess_exec", return_value=process):
|
||||
nio = Qemu.instance().create_nio({"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1"})
|
||||
vm.adapter_add_nio_binding(0, nio)
|
||||
loop.run_until_complete(asyncio.async(vm.start()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.start()))
|
||||
assert vm.is_running()
|
||||
loop.run_until_complete(asyncio.async(vm.stop()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.stop()))
|
||||
assert vm.is_running() is False
|
||||
process.terminate.assert_called_with()
|
||||
|
||||
@ -184,7 +184,7 @@ def test_termination_callback_error(vm, tmpdir, async_run):
|
||||
def test_reload(loop, vm):
|
||||
|
||||
with asyncio_patch("gns3server.compute.qemu.QemuVM._control_vm") as mock:
|
||||
loop.run_until_complete(asyncio.async(vm.reload()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.reload()))
|
||||
assert mock.called_with("system_reset")
|
||||
|
||||
|
||||
@ -193,33 +193,33 @@ def test_suspend(loop, vm):
|
||||
control_vm_result = MagicMock()
|
||||
control_vm_result.match.group.decode.return_value = "running"
|
||||
with asyncio_patch("gns3server.compute.qemu.QemuVM._control_vm", return_value=control_vm_result) as mock:
|
||||
loop.run_until_complete(asyncio.async(vm.suspend()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.suspend()))
|
||||
assert mock.called_with("system_reset")
|
||||
|
||||
|
||||
def test_add_nio_binding_udp(vm, loop):
|
||||
nio = Qemu.instance().create_nio({"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1", "filters": {}})
|
||||
assert nio.lport == 4242
|
||||
loop.run_until_complete(asyncio.async(vm.adapter_add_nio_binding(0, nio)))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.adapter_add_nio_binding(0, nio)))
|
||||
assert nio.lport == 4242
|
||||
|
||||
|
||||
|
||||
def test_port_remove_nio_binding(vm, loop):
|
||||
nio = Qemu.instance().create_nio({"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1", "filters": {}})
|
||||
loop.run_until_complete(asyncio.async(vm.adapter_add_nio_binding(0, nio)))
|
||||
loop.run_until_complete(asyncio.async(vm.adapter_remove_nio_binding(0)))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.adapter_add_nio_binding(0, nio)))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.adapter_remove_nio_binding(0)))
|
||||
assert vm._ethernet_adapters[0].ports[0] is None
|
||||
|
||||
|
||||
def test_close(vm, port_manager, loop):
|
||||
with asyncio_patch("gns3server.compute.qemu.QemuVM.start_wrap_console"):
|
||||
with asyncio_patch("asyncio.create_subprocess_exec", return_value=MagicMock()):
|
||||
loop.run_until_complete(asyncio.async(vm.start()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.start()))
|
||||
|
||||
console_port = vm.console
|
||||
|
||||
loop.run_until_complete(asyncio.async(vm.close()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.close()))
|
||||
|
||||
# Raise an exception if the port is not free
|
||||
port_manager.reserve_tcp_port(console_port, vm.project)
|
||||
@ -328,7 +328,7 @@ def test_disk_options(vm, tmpdir, loop, fake_qemu_img_binary):
|
||||
open(vm._hda_disk_image, "w+").close()
|
||||
|
||||
with asyncio_patch("asyncio.create_subprocess_exec", return_value=MagicMock()) as process:
|
||||
options = loop.run_until_complete(asyncio.async(vm._disk_options()))
|
||||
options = loop.run_until_complete(asyncio.ensure_future(vm._disk_options()))
|
||||
assert process.called
|
||||
args, kwargs = process.call_args
|
||||
assert args == (fake_qemu_img_binary, "create", "-o", "backing_file={}".format(vm._hda_disk_image), "-f", "qcow2", os.path.join(vm.working_dir, "hda_disk.qcow2"))
|
||||
@ -341,7 +341,7 @@ def test_cdrom_option(vm, tmpdir, loop, fake_qemu_img_binary):
|
||||
vm._cdrom_image = str(tmpdir / "test.iso")
|
||||
open(vm._cdrom_image, "w+").close()
|
||||
|
||||
options = loop.run_until_complete(asyncio.async(vm._build_command()))
|
||||
options = loop.run_until_complete(asyncio.ensure_future(vm._build_command()))
|
||||
|
||||
assert ' '.join(['-cdrom', str(tmpdir / "test.iso")]) in ' '.join(options)
|
||||
|
||||
@ -351,7 +351,7 @@ def test_bios_option(vm, tmpdir, loop, fake_qemu_img_binary):
|
||||
vm._bios_image = str(tmpdir / "test.img")
|
||||
open(vm._bios_image, "w+").close()
|
||||
|
||||
options = loop.run_until_complete(asyncio.async(vm._build_command()))
|
||||
options = loop.run_until_complete(asyncio.ensure_future(vm._build_command()))
|
||||
|
||||
assert ' '.join(['-bios', str(tmpdir / "test.img")]) in ' '.join(options)
|
||||
|
||||
@ -359,14 +359,14 @@ def test_bios_option(vm, tmpdir, loop, fake_qemu_img_binary):
|
||||
def test_vnc_option(vm, tmpdir, loop, fake_qemu_img_binary):
|
||||
vm._console_type = 'vnc'
|
||||
vm._console = 5905
|
||||
options = loop.run_until_complete(asyncio.async(vm._build_command()))
|
||||
options = loop.run_until_complete(asyncio.ensure_future(vm._build_command()))
|
||||
assert '-vnc 127.0.0.1:5' in ' '.join(options)
|
||||
|
||||
|
||||
def test_spice_option(vm, tmpdir, loop, fake_qemu_img_binary):
|
||||
vm._console_type = 'spice'
|
||||
vm._console = 5905
|
||||
options = loop.run_until_complete(asyncio.async(vm._build_command()))
|
||||
options = loop.run_until_complete(asyncio.ensure_future(vm._build_command()))
|
||||
assert '-spice addr=127.0.0.1,port=5905,disable-ticketing' in ' '.join(options)
|
||||
assert '-vga qxl' in ' '.join(options)
|
||||
|
||||
@ -383,7 +383,7 @@ def test_disk_options_multiple_disk(vm, tmpdir, loop, fake_qemu_img_binary):
|
||||
open(vm._hdd_disk_image, "w+").close()
|
||||
|
||||
with asyncio_patch("asyncio.create_subprocess_exec", return_value=MagicMock()) as process:
|
||||
options = loop.run_until_complete(asyncio.async(vm._disk_options()))
|
||||
options = loop.run_until_complete(asyncio.ensure_future(vm._disk_options()))
|
||||
|
||||
assert options == [
|
||||
'-drive', 'file=' + os.path.join(vm.working_dir, "hda_disk.qcow2") + ',if=ide,index=0,media=disk',
|
||||
@ -400,7 +400,7 @@ def test_set_process_priority(vm, loop, fake_qemu_img_binary):
|
||||
vm._process = MagicMock()
|
||||
vm._process.pid = 42
|
||||
vm._process_priority = "low"
|
||||
loop.run_until_complete(asyncio.async(vm._set_process_priority()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm._set_process_priority()))
|
||||
assert process.called
|
||||
args, kwargs = process.call_args
|
||||
assert args == ("renice", "-n", "5", "-p", "42")
|
||||
@ -412,7 +412,7 @@ def test_set_process_priority_normal(vm, loop, fake_qemu_img_binary):
|
||||
with asyncio_patch("asyncio.create_subprocess_exec", return_value=MagicMock()) as process:
|
||||
vm._process = MagicMock()
|
||||
vm._process.pid = 42
|
||||
loop.run_until_complete(asyncio.async(vm._set_process_priority()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm._set_process_priority()))
|
||||
assert not process.called
|
||||
|
||||
|
||||
@ -429,7 +429,7 @@ def test_control_vm(vm, loop):
|
||||
reader = MagicMock()
|
||||
writer = MagicMock()
|
||||
with asyncio_patch("asyncio.open_connection", return_value=(reader, writer)) as open_connect:
|
||||
res = loop.run_until_complete(asyncio.async(vm._control_vm("test")))
|
||||
res = loop.run_until_complete(asyncio.ensure_future(vm._control_vm("test")))
|
||||
assert writer.write.called_with("test")
|
||||
assert res is None
|
||||
|
||||
@ -446,7 +446,7 @@ def test_control_vm_expect_text(vm, loop, running_subprocess_mock):
|
||||
reader.readline.return_value = future
|
||||
|
||||
vm._monitor = 4242
|
||||
res = loop.run_until_complete(asyncio.async(vm._control_vm("test", [b"epic"])))
|
||||
res = loop.run_until_complete(asyncio.ensure_future(vm._control_vm("test", [b"epic"])))
|
||||
assert writer.write.called_with("test")
|
||||
|
||||
assert res == "epic product"
|
||||
@ -456,7 +456,7 @@ def test_build_command(vm, loop, fake_qemu_binary, port_manager):
|
||||
|
||||
os.environ["DISPLAY"] = "0:0"
|
||||
with asyncio_patch("asyncio.create_subprocess_exec", return_value=MagicMock()) as process:
|
||||
cmd = loop.run_until_complete(asyncio.async(vm._build_command()))
|
||||
cmd = loop.run_until_complete(asyncio.ensure_future(vm._build_command()))
|
||||
nio = vm._local_udp_tunnels[0][0]
|
||||
assert cmd == [
|
||||
fake_qemu_binary,
|
||||
@ -489,7 +489,7 @@ def test_build_command_manual_uuid(vm, loop, fake_qemu_binary, port_manager):
|
||||
vm.options = "-uuid e1c307a4-896f-11e6-81a5-3c07547807cc"
|
||||
os.environ["DISPLAY"] = "0:0"
|
||||
with asyncio_patch("asyncio.create_subprocess_exec", return_value=MagicMock()) as process:
|
||||
cmd = loop.run_until_complete(asyncio.async(vm._build_command()))
|
||||
cmd = loop.run_until_complete(asyncio.ensure_future(vm._build_command()))
|
||||
assert "e1c307a4-896f-11e6-81a5-3c07547807cc" in cmd
|
||||
assert vm.id not in cmd
|
||||
|
||||
@ -502,7 +502,7 @@ def test_build_command_kvm(linux_platform, vm, loop, fake_qemu_binary, port_mana
|
||||
vm.manager.get_qemu_version = AsyncioMagicMock(return_value="2.3.2")
|
||||
os.environ["DISPLAY"] = "0:0"
|
||||
with asyncio_patch("asyncio.create_subprocess_exec", return_value=MagicMock()) as process:
|
||||
cmd = loop.run_until_complete(asyncio.async(vm._build_command()))
|
||||
cmd = loop.run_until_complete(asyncio.ensure_future(vm._build_command()))
|
||||
nio = vm._local_udp_tunnels[0][0]
|
||||
assert cmd == [
|
||||
fake_qemu_binary,
|
||||
@ -536,7 +536,7 @@ def test_build_command_kvm_2_4(linux_platform, vm, loop, fake_qemu_binary, port_
|
||||
vm.manager.get_qemu_version = AsyncioMagicMock(return_value="2.4.2")
|
||||
os.environ["DISPLAY"] = "0:0"
|
||||
with asyncio_patch("asyncio.create_subprocess_exec", return_value=MagicMock()) as process:
|
||||
cmd = loop.run_until_complete(asyncio.async(vm._build_command()))
|
||||
cmd = loop.run_until_complete(asyncio.ensure_future(vm._build_command()))
|
||||
nio = vm._local_udp_tunnels[0][0]
|
||||
assert cmd == [
|
||||
fake_qemu_binary,
|
||||
@ -569,7 +569,7 @@ def test_build_command_without_display(vm, loop, fake_qemu_binary):
|
||||
|
||||
os.environ["DISPLAY"] = ""
|
||||
with asyncio_patch("asyncio.create_subprocess_exec", return_value=MagicMock()) as process:
|
||||
cmd = loop.run_until_complete(asyncio.async(vm._build_command()))
|
||||
cmd = loop.run_until_complete(asyncio.ensure_future(vm._build_command()))
|
||||
assert "-nographic" in cmd
|
||||
|
||||
|
||||
@ -578,7 +578,7 @@ def test_build_command_two_adapters(vm, loop, fake_qemu_binary, port_manager):
|
||||
os.environ["DISPLAY"] = "0:0"
|
||||
vm.adapters = 2
|
||||
with asyncio_patch("asyncio.create_subprocess_exec", return_value=MagicMock()) as process:
|
||||
cmd = loop.run_until_complete(asyncio.async(vm._build_command()))
|
||||
cmd = loop.run_until_complete(asyncio.ensure_future(vm._build_command()))
|
||||
nio1 = vm._local_udp_tunnels[0][0]
|
||||
nio2 = vm._local_udp_tunnels[1][0]
|
||||
assert cmd == [
|
||||
@ -619,7 +619,7 @@ def test_build_command_two_adapters_mac_address(vm, loop, fake_qemu_binary, port
|
||||
mac_1 = int_to_macaddress(macaddress_to_int(vm._mac_address) + 1)
|
||||
assert mac_0[:8] == "00:00:ab"
|
||||
with asyncio_patch("asyncio.create_subprocess_exec", return_value=MagicMock()) as process:
|
||||
cmd = loop.run_until_complete(asyncio.async(vm._build_command()))
|
||||
cmd = loop.run_until_complete(asyncio.ensure_future(vm._build_command()))
|
||||
assert "e1000,mac={},netdev=gns3-0".format(mac_0) in cmd
|
||||
assert "e1000,mac={},netdev=gns3-1".format(mac_1) in cmd
|
||||
|
||||
@ -628,7 +628,7 @@ def test_build_command_two_adapters_mac_address(vm, loop, fake_qemu_binary, port
|
||||
mac_1 = int_to_macaddress(macaddress_to_int(vm._mac_address) + 1)
|
||||
assert mac_0[:8] == "00:42:ab"
|
||||
with asyncio_patch("asyncio.create_subprocess_exec", return_value=MagicMock()) as process:
|
||||
cmd = loop.run_until_complete(asyncio.async(vm._build_command()))
|
||||
cmd = loop.run_until_complete(asyncio.ensure_future(vm._build_command()))
|
||||
assert "e1000,mac={},netdev=gns3-0".format(mac_0) in cmd
|
||||
assert "e1000,mac={},netdev=gns3-1".format(mac_1) in cmd
|
||||
|
||||
@ -648,7 +648,7 @@ def test_build_command_large_number_of_adapters(vm, loop, fake_qemu_binary, port
|
||||
mac_1 = int_to_macaddress(macaddress_to_int(vm._mac_address) + 1)
|
||||
assert mac_0[:8] == "00:00:ab"
|
||||
with asyncio_patch("asyncio.create_subprocess_exec", return_value=MagicMock()) as process:
|
||||
cmd = loop.run_until_complete(asyncio.async(vm._build_command()))
|
||||
cmd = loop.run_until_complete(asyncio.ensure_future(vm._build_command()))
|
||||
|
||||
# Count if we have 100 e1000 adapters in the command
|
||||
assert len([l for l in cmd if "e1000" in l ]) == 100
|
||||
@ -672,10 +672,10 @@ def test_build_command_large_number_of_adapters(vm, loop, fake_qemu_binary, port
|
||||
vm.manager.get_qemu_version = AsyncioMagicMock(return_value="2.0.0")
|
||||
with pytest.raises(QemuError):
|
||||
with asyncio_patch("asyncio.create_subprocess_exec", return_value=MagicMock()) as process:
|
||||
cmd = loop.run_until_complete(asyncio.async(vm._build_command()))
|
||||
cmd = loop.run_until_complete(asyncio.ensure_future(vm._build_command()))
|
||||
vm.adapters = 5
|
||||
with asyncio_patch("asyncio.create_subprocess_exec", return_value=MagicMock()) as process:
|
||||
cmd = loop.run_until_complete(asyncio.async(vm._build_command()))
|
||||
cmd = loop.run_until_complete(asyncio.ensure_future(vm._build_command()))
|
||||
|
||||
# Windows accept this kind of mistake
|
||||
|
||||
@ -685,7 +685,7 @@ def test_build_command_with_invalid_options(vm, loop, fake_qemu_binary):
|
||||
|
||||
vm.options = "'test"
|
||||
with pytest.raises(QemuError):
|
||||
cmd = loop.run_until_complete(asyncio.async(vm._build_command()))
|
||||
cmd = loop.run_until_complete(asyncio.ensure_future(vm._build_command()))
|
||||
|
||||
|
||||
def test_hda_disk_image(vm, images_dir):
|
||||
|
@ -83,7 +83,7 @@ def test_close(node, loop, port_manager):
|
||||
|
||||
node.aux = aux
|
||||
port = node.console
|
||||
assert loop.run_until_complete(asyncio.async(node.close()))
|
||||
assert loop.run_until_complete(asyncio.ensure_future(node.close()))
|
||||
# Raise an exception if the port is not free
|
||||
port_manager.reserve_tcp_port(port, node.project)
|
||||
# Raise an exception if the port is not free
|
||||
@ -92,7 +92,7 @@ def test_close(node, loop, port_manager):
|
||||
assert node.aux is None
|
||||
|
||||
# Called twice closed should return False
|
||||
assert loop.run_until_complete(asyncio.async(node.close())) is False
|
||||
assert loop.run_until_complete(asyncio.ensure_future(node.close())) is False
|
||||
|
||||
|
||||
def test_aux(project, manager, port_manager):
|
||||
|
@ -43,7 +43,7 @@ def manager(port_manager):
|
||||
@pytest.fixture(scope="function")
|
||||
def node(project, manager, loop):
|
||||
node = manager.create_node("test", project.id, "00010203-0405-0607-0809-0a0b0c0d0e0f")
|
||||
return loop.run_until_complete(asyncio.async(node))
|
||||
return loop.run_until_complete(asyncio.ensure_future(node))
|
||||
|
||||
|
||||
def test_affect_uuid():
|
||||
@ -140,7 +140,7 @@ def test_project_delete(loop):
|
||||
project = Project(project_id=str(uuid4()))
|
||||
directory = project.path
|
||||
assert os.path.exists(directory)
|
||||
loop.run_until_complete(asyncio.async(project.delete()))
|
||||
loop.run_until_complete(asyncio.ensure_future(project.delete()))
|
||||
assert os.path.exists(directory) is False
|
||||
|
||||
|
||||
@ -150,7 +150,7 @@ def test_project_delete_permission_issue(loop):
|
||||
assert os.path.exists(directory)
|
||||
os.chmod(directory, 0)
|
||||
with pytest.raises(aiohttp.web.HTTPInternalServerError):
|
||||
loop.run_until_complete(asyncio.async(project.delete()))
|
||||
loop.run_until_complete(asyncio.ensure_future(project.delete()))
|
||||
os.chmod(directory, 700)
|
||||
|
||||
|
||||
@ -164,7 +164,7 @@ def test_project_add_node(manager):
|
||||
def test_project_close(loop, node, project):
|
||||
|
||||
with asyncio_patch("gns3server.compute.vpcs.vpcs_vm.VPCSVM.close") as mock:
|
||||
loop.run_until_complete(asyncio.async(project.close()))
|
||||
loop.run_until_complete(asyncio.ensure_future(project.close()))
|
||||
assert mock.called
|
||||
assert node.id not in node.manager._nodes
|
||||
|
||||
@ -181,7 +181,7 @@ def test_list_files(tmpdir, loop):
|
||||
with open(os.path.join(path, "test.txt"), "w+") as f:
|
||||
f.write("test2")
|
||||
|
||||
files = loop.run_until_complete(asyncio.async(project.list_files()))
|
||||
files = loop.run_until_complete(asyncio.ensure_future(project.list_files()))
|
||||
|
||||
assert files == [
|
||||
{
|
||||
@ -210,5 +210,5 @@ def test_emit(async_run):
|
||||
def test_update_project(loop):
|
||||
variables = [{"name": "TEST", "value": "VAL"}]
|
||||
project = Project(project_id=str(uuid.uuid4()))
|
||||
loop.run_until_complete(asyncio.async(project.update(variables=variables)))
|
||||
loop.run_until_complete(asyncio.ensure_future(project.update(variables=variables)))
|
||||
assert project.variables == variables
|
||||
|
@ -56,7 +56,7 @@ def test_vm_invalid_traceng_path(vm, manager, loop):
|
||||
with pytest.raises(TraceNGError):
|
||||
nio = manager.create_nio({"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1"})
|
||||
vm.port_add_nio_binding(0, nio)
|
||||
loop.run_until_complete(asyncio.async(vm.start()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.start()))
|
||||
assert vm.name == "test"
|
||||
assert vm.id == "00010203-0405-0607-0809-0a0b0c0d0e0e"
|
||||
|
||||
@ -72,7 +72,7 @@ def test_start(loop, vm, async_run):
|
||||
with patch("sys.platform", return_value="win"):
|
||||
with asyncio_patch("gns3server.compute.traceng.traceng_vm.TraceNGVM._check_requirements", return_value=True):
|
||||
with asyncio_patch("asyncio.create_subprocess_exec", return_value=process) as mock_exec:
|
||||
loop.run_until_complete(asyncio.async(vm.start("192.168.1.2")))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.start("192.168.1.2")))
|
||||
assert mock_exec.call_args[0] == (vm._traceng_path(),
|
||||
'-u',
|
||||
'-c',
|
||||
@ -115,7 +115,7 @@ def test_stop(loop, vm, async_run):
|
||||
assert vm.is_running()
|
||||
|
||||
with asyncio_patch("gns3server.utils.asyncio.wait_for_process_termination"):
|
||||
loop.run_until_complete(asyncio.async(vm.stop()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.stop()))
|
||||
assert vm.is_running() is False
|
||||
|
||||
process.terminate.assert_called_with()
|
||||
@ -175,5 +175,5 @@ def test_close(vm, port_manager, loop):
|
||||
with asyncio_patch("gns3server.compute.traceng.traceng_vm.TraceNGVM._check_requirements", return_value=True):
|
||||
with asyncio_patch("asyncio.create_subprocess_exec", return_value=MagicMock()):
|
||||
vm.start()
|
||||
loop.run_until_complete(asyncio.async(vm.close()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.close()))
|
||||
assert vm.is_running() is False
|
||||
|
@ -92,7 +92,7 @@ def test_list_vms(manager, loop):
|
||||
|
||||
with asyncio_patch("gns3server.compute.virtualbox.VirtualBox.execute") as mock:
|
||||
mock.side_effect = execute_mock
|
||||
vms = loop.run_until_complete(asyncio.async(manager.list_vms()))
|
||||
vms = loop.run_until_complete(asyncio.ensure_future(manager.list_vms()))
|
||||
assert vms == [
|
||||
{"vmname": "Windows 8.1", "ram": 512},
|
||||
{"vmname": "Linux Microcore 4.7.1", "ram": 256}
|
||||
|
@ -75,20 +75,20 @@ def test_rename_vmname(project, manager, async_run):
|
||||
def test_vm_valid_virtualbox_api_version(loop, project, manager):
|
||||
with asyncio_patch("gns3server.compute.virtualbox.VirtualBox.execute", return_value=["API version: 4_3"]):
|
||||
vm = VirtualBoxVM("test", "00010203-0405-0607-0809-0a0b0c0d0e0f", project, manager, "test", False)
|
||||
loop.run_until_complete(asyncio.async(vm.create()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.create()))
|
||||
|
||||
|
||||
def test_vm_invalid_virtualbox_api_version(loop, project, manager):
|
||||
with asyncio_patch("gns3server.compute.virtualbox.VirtualBox.execute", return_value=["API version: 4_2"]):
|
||||
with pytest.raises(VirtualBoxError):
|
||||
vm = VirtualBoxVM("test", "00010203-0405-0607-0809-0a0b0c0d0e0f", project, manager, "test", False)
|
||||
loop.run_until_complete(asyncio.async(vm.create()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.create()))
|
||||
|
||||
|
||||
def test_vm_adapter_add_nio_binding_adapter_not_exist(loop, vm, manager, free_console_port):
|
||||
nio = manager.create_nio({"type": "nio_udp", "lport": free_console_port, "rport": free_console_port, "rhost": "127.0.0.1"})
|
||||
with pytest.raises(VirtualBoxError):
|
||||
loop.run_until_complete(asyncio.async(vm.adapter_add_nio_binding(15, nio)))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.adapter_add_nio_binding(15, nio)))
|
||||
|
||||
|
||||
def test_json(vm, tmpdir, project):
|
||||
|
@ -56,8 +56,8 @@ def test_start_capture(vm, tmpdir, manager, free_console_port, loop):
|
||||
output_file = str(tmpdir / "test.pcap")
|
||||
nio = manager.create_nio({"type": "nio_udp", "lport": free_console_port, "rport": free_console_port, "rhost": "127.0.0.1"})
|
||||
vm.adapters = 1
|
||||
loop.run_until_complete(asyncio.async(vm.adapter_add_nio_binding(0, nio)))
|
||||
loop.run_until_complete(asyncio.async(vm.start_capture(0, output_file)))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.adapter_add_nio_binding(0, nio)))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.start_capture(0, output_file)))
|
||||
assert vm._ethernet_adapters[0].get_nio(0).capturing
|
||||
|
||||
|
||||
@ -66,8 +66,8 @@ def test_stop_capture(vm, tmpdir, manager, free_console_port, loop):
|
||||
output_file = str(tmpdir / "test.pcap")
|
||||
nio = manager.create_nio({"type": "nio_udp", "lport": free_console_port, "rport": free_console_port, "rhost": "127.0.0.1"})
|
||||
vm.adapters = 1
|
||||
loop.run_until_complete(asyncio.async(vm.adapter_add_nio_binding(0, nio)))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.adapter_add_nio_binding(0, nio)))
|
||||
loop.run_until_complete(vm.start_capture(0, output_file))
|
||||
assert vm._ethernet_adapters[0].get_nio(0).capturing
|
||||
loop.run_until_complete(asyncio.async(vm.stop_capture(0)))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.stop_capture(0)))
|
||||
assert vm._ethernet_adapters[0].get_nio(0).capturing is False
|
||||
|
@ -56,13 +56,13 @@ def test_vm(project, manager):
|
||||
|
||||
def test_vm_check_vpcs_version(loop, vm, manager):
|
||||
with asyncio_patch("gns3server.compute.vpcs.vpcs_vm.subprocess_check_output", return_value="Welcome to Virtual PC Simulator, version 0.9"):
|
||||
loop.run_until_complete(asyncio.async(vm._check_vpcs_version()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm._check_vpcs_version()))
|
||||
assert vm._vpcs_version == parse_version("0.9")
|
||||
|
||||
|
||||
def test_vm_check_vpcs_version_0_6_1(loop, vm, manager):
|
||||
with asyncio_patch("gns3server.compute.vpcs.vpcs_vm.subprocess_check_output", return_value="Welcome to Virtual PC Simulator, version 0.6.1"):
|
||||
loop.run_until_complete(asyncio.async(vm._check_vpcs_version()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm._check_vpcs_version()))
|
||||
assert vm._vpcs_version == parse_version("0.6.1")
|
||||
|
||||
|
||||
@ -71,7 +71,7 @@ def test_vm_invalid_vpcs_version(loop, manager, vm):
|
||||
with pytest.raises(VPCSError):
|
||||
nio = manager.create_nio({"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1", "filters": {}})
|
||||
vm.port_add_nio_binding(0, nio)
|
||||
loop.run_until_complete(asyncio.async(vm._check_vpcs_version()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm._check_vpcs_version()))
|
||||
assert vm.name == "test"
|
||||
assert vm.id == "00010203-0405-0607-0809-0a0b0c0d0e0f"
|
||||
|
||||
@ -81,7 +81,7 @@ def test_vm_invalid_vpcs_path(vm, manager, loop):
|
||||
with pytest.raises(VPCSError):
|
||||
nio = manager.create_nio({"type": "nio_udp", "lport": 4242, "rport": 4243, "rhost": "127.0.0.1"})
|
||||
vm.port_add_nio_binding(0, nio)
|
||||
loop.run_until_complete(asyncio.async(vm.start()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.start()))
|
||||
assert vm.name == "test"
|
||||
assert vm.id == "00010203-0405-0607-0809-0a0b0c0d0e0e"
|
||||
|
||||
@ -96,7 +96,7 @@ def test_start(loop, vm, async_run):
|
||||
with asyncio_patch("gns3server.compute.vpcs.vpcs_vm.VPCSVM._check_requirements", return_value=True):
|
||||
with asyncio_patch("asyncio.create_subprocess_exec", return_value=process) as mock_exec:
|
||||
with asyncio_patch("gns3server.compute.vpcs.vpcs_vm.VPCSVM.start_wrap_console"):
|
||||
loop.run_until_complete(asyncio.async(vm.start()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.start()))
|
||||
assert mock_exec.call_args[0] == (vm._vpcs_path(),
|
||||
'-p',
|
||||
str(vm._internal_console_port),
|
||||
@ -169,7 +169,7 @@ def test_stop(loop, vm, async_run):
|
||||
assert vm.is_running()
|
||||
|
||||
with asyncio_patch("gns3server.utils.asyncio.wait_for_process_termination"):
|
||||
loop.run_until_complete(asyncio.async(vm.stop()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.stop()))
|
||||
assert vm.is_running() is False
|
||||
|
||||
if sys.platform.startswith("win"):
|
||||
@ -300,5 +300,5 @@ def test_close(vm, port_manager, loop):
|
||||
with asyncio_patch("gns3server.compute.vpcs.vpcs_vm.VPCSVM._check_requirements", return_value=True):
|
||||
with asyncio_patch("asyncio.create_subprocess_exec", return_value=MagicMock()):
|
||||
vm.start()
|
||||
loop.run_until_complete(asyncio.async(vm.close()))
|
||||
loop.run_until_complete(asyncio.ensure_future(vm.close()))
|
||||
assert vm.is_running() is False
|
||||
|
@ -314,7 +314,7 @@ def async_run(loop):
|
||||
"""
|
||||
Shortcut for running in asyncio loop
|
||||
"""
|
||||
return lambda x: loop.run_until_complete(asyncio.async(x))
|
||||
return lambda x: loop.run_until_complete(asyncio.ensure_future(x))
|
||||
|
||||
|
||||
@pytest.yield_fixture
|
||||
|
@ -74,7 +74,7 @@ class Query:
|
||||
response = yield from self._session.ws_connect(self.get_url(path))
|
||||
future.set_result(response)
|
||||
future = asyncio.Future()
|
||||
asyncio.async(go_request(future))
|
||||
asyncio.ensure_future(go_request(future))
|
||||
self._loop.run_until_complete(future)
|
||||
return future.result()
|
||||
|
||||
@ -85,7 +85,7 @@ class Query:
|
||||
- example if True the session is included inside documentation
|
||||
- raw do not JSON encode the query
|
||||
"""
|
||||
return self._loop.run_until_complete(asyncio.async(self._async_fetch(method, path, body=body, **kwargs)))
|
||||
return self._loop.run_until_complete(asyncio.ensure_future(self._async_fetch(method, path, body=body, **kwargs)))
|
||||
|
||||
@asyncio.coroutine
|
||||
def _async_fetch(self, method, path, body=None, **kwargs):
|
||||
|
@ -356,7 +356,7 @@ def test_pcap(http_controller, tmpdir, project, compute, loop):
|
||||
project._links = {link.id: link}
|
||||
|
||||
future = asyncio.Future()
|
||||
asyncio.async(go(future))
|
||||
asyncio.ensure_future(go(future))
|
||||
response = loop.run_until_complete(future)
|
||||
assert response.status == 200
|
||||
assert b'hello' == response.body
|
||||
|
@ -179,7 +179,7 @@ def test_notification(http_controller, project, controller, loop, async_run):
|
||||
response.close()
|
||||
return response
|
||||
|
||||
response = async_run(asyncio.async(go()))
|
||||
response = async_run(asyncio.ensure_future(go()))
|
||||
assert response.status == 200
|
||||
assert b'"action": "ping"' in response.body
|
||||
assert b'"cpu_usage_percent"' in response.body
|
||||
|
@ -31,7 +31,7 @@ def test_wait_run_in_executor(loop):
|
||||
return param
|
||||
|
||||
exec = wait_run_in_executor(change_var, "test")
|
||||
result = loop.run_until_complete(asyncio.async(exec))
|
||||
result = loop.run_until_complete(asyncio.ensure_future(exec))
|
||||
assert result == "test"
|
||||
|
||||
|
||||
@ -42,7 +42,7 @@ def test_exception_wait_run_in_executor(loop):
|
||||
|
||||
exec = wait_run_in_executor(raise_exception)
|
||||
with pytest.raises(Exception):
|
||||
result = loop.run_until_complete(asyncio.async(exec))
|
||||
result = loop.run_until_complete(asyncio.ensure_future(exec))
|
||||
|
||||
|
||||
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Not supported on Windows")
|
||||
@ -52,7 +52,7 @@ def test_subprocess_check_output(loop, tmpdir, restore_original_path):
|
||||
with open(path, "w+") as f:
|
||||
f.write("TEST")
|
||||
exec = subprocess_check_output("cat", path)
|
||||
result = loop.run_until_complete(asyncio.async(exec))
|
||||
result = loop.run_until_complete(asyncio.ensure_future(exec))
|
||||
assert result == "TEST"
|
||||
|
||||
|
||||
@ -64,13 +64,13 @@ def test_wait_for_process_termination(loop):
|
||||
process = MagicMock()
|
||||
process.returncode = 0
|
||||
exec = wait_for_process_termination(process)
|
||||
loop.run_until_complete(asyncio.async(exec))
|
||||
loop.run_until_complete(asyncio.ensure_future(exec))
|
||||
|
||||
process = MagicMock()
|
||||
process.returncode = None
|
||||
exec = wait_for_process_termination(process, timeout=0.5)
|
||||
with pytest.raises(asyncio.TimeoutError):
|
||||
loop.run_until_complete(asyncio.async(exec))
|
||||
loop.run_until_complete(asyncio.ensure_future(exec))
|
||||
|
||||
|
||||
def test_lock_decorator(loop):
|
||||
|
Loading…
Reference in New Issue
Block a user