Merge pull request #638 from GNS3/nat

Nat node for only the GNS3 VM
This commit is contained in:
Jeremy Grossmann 2016-08-21 21:45:11 -06:00 committed by GitHub
commit fe9e824f1c
12 changed files with 756 additions and 222 deletions

View File

@ -18,6 +18,7 @@
from ..error import NodeError
from .nodes.cloud import Cloud
from .nodes.nat import Nat
from .nodes.ethernet_hub import EthernetHub
from .nodes.ethernet_switch import EthernetSwitch
@ -25,6 +26,7 @@ import logging
log = logging.getLogger(__name__)
BUILTIN_NODES = {'cloud': Cloud,
'nat': Nat,
'ethernet_hub': EthernetHub,
'ethernet_switch': EthernetSwitch}

View File

@ -0,0 +1,51 @@
#!/usr/bin/env python
#
# Copyright (C) 2016 GNS3 Technologies Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import socket
from .cloud import Cloud
from ...error import NodeError
class Nat(Cloud):
"""
A portable and preconfigured node allowing topology to get a
nat access to the outside
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if socket.gethostname() != "gns3vm":
raise NodeError("NAT node is supported only on GNS3 VM")
self.ports = [
{
"name": "nat0",
"type": "ethernet",
"interface": "eth1",
"port_number": 1
}
]
def __json__(self):
return {
"name": self.name,
"node_id": self.id,
"project_id": self.project.id,
"status": "started",
"ports": self.ports
}

View File

@ -28,6 +28,7 @@ from .vmware_handler import VMwareHandler
from .version_handler import VersionHandler
from .notification_handler import NotificationHandler
from .cloud_handler import CloudHandler
from .nat_handler import NatHandler
from .ethernet_hub_handler import EthernetHubHandler
from .ethernet_switch_handler import EthernetSwitchHandler
from .frame_relay_switch_handler import FrameRelaySwitchHandler

View File

@ -0,0 +1,267 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2016 GNS3 Technologies Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
from gns3server.web.route import Route
from gns3server.schemas.node import NODE_CAPTURE_SCHEMA
from gns3server.schemas.nio import NIO_SCHEMA
from gns3server.compute.builtin import Builtin
from gns3server.schemas.nat import (
NAT_CREATE_SCHEMA,
NAT_OBJECT_SCHEMA,
NAT_UPDATE_SCHEMA
)
class NatHandler:
"""
API entry points for nat
"""
@Route.post(
r"/projects/{project_id}/nat/nodes",
parameters={
"project_id": "Project UUID"
},
status_codes={
201: "Instance created",
400: "Invalid request",
409: "Conflict"
},
description="Create a new nat instance",
input=NAT_CREATE_SCHEMA,
output=NAT_OBJECT_SCHEMA)
def create(request, response):
builtin_manager = Builtin.instance()
node = yield from builtin_manager.create_node(request.json.pop("name"),
request.match_info["project_id"],
request.json.get("node_id"),
node_type="nat",
ports=request.json.get("ports"))
response.set_status(201)
response.json(node)
@Route.get(
r"/projects/{project_id}/nat/nodes/{node_id}",
parameters={
"project_id": "Project UUID",
"node_id": "Node UUID"
},
status_codes={
200: "Success",
400: "Invalid request",
404: "Instance doesn't exist"
},
description="Get a nat instance",
output=NAT_OBJECT_SCHEMA)
def show(request, response):
builtin_manager = Builtin.instance()
node = builtin_manager.get_node(request.match_info["node_id"], project_id=request.match_info["project_id"])
response.json(node)
@Route.put(
r"/projects/{project_id}/nat/nodes/{node_id}",
parameters={
"project_id": "Project UUID",
"node_id": "Node UUID"
},
status_codes={
200: "Instance updated",
400: "Invalid request",
404: "Instance doesn't exist",
409: "Conflict"
},
description="Update a nat instance",
input=NAT_UPDATE_SCHEMA,
output=NAT_OBJECT_SCHEMA)
def update(request, response):
builtin_manager = Builtin.instance()
node = builtin_manager.get_node(request.match_info["node_id"], project_id=request.match_info["project_id"])
for name, value in request.json.items():
if hasattr(node, name) and getattr(node, name) != value:
setattr(node, name, value)
node.updated()
response.json(node)
@Route.delete(
r"/projects/{project_id}/nat/nodes/{node_id}",
parameters={
"project_id": "Project UUID",
"node_id": "Node UUID"
},
status_codes={
204: "Instance deleted",
400: "Invalid request",
404: "Instance doesn't exist"
},
description="Delete a nat instance")
def delete(request, response):
builtin_manager = Builtin.instance()
yield from builtin_manager.delete_node(request.match_info["node_id"])
response.set_status(204)
@Route.post(
r"/projects/{project_id}/nat/nodes/{node_id}/start",
parameters={
"project_id": "Project UUID",
"node_id": "Node UUID"
},
status_codes={
204: "Instance started",
400: "Invalid request",
404: "Instance doesn't exist"
},
description="Start a nat")
def start(request, response):
Builtin.instance().get_node(request.match_info["node_id"], project_id=request.match_info["project_id"])
response.set_status(204)
@Route.post(
r"/projects/{project_id}/nat/nodes/{node_id}/stop",
parameters={
"project_id": "Project UUID",
"node_id": "Node UUID"
},
status_codes={
204: "Instance stopped",
400: "Invalid request",
404: "Instance doesn't exist"
},
description="Stop a nat")
def stop(request, response):
Builtin.instance().get_node(request.match_info["node_id"], project_id=request.match_info["project_id"])
response.set_status(204)
@Route.post(
r"/projects/{project_id}/nat/nodes/{node_id}/suspend",
parameters={
"project_id": "Project UUID",
"node_id": "Node UUID"
},
status_codes={
204: "Instance suspended",
400: "Invalid request",
404: "Instance doesn't exist"
},
description="Suspend a nat")
def suspend(request, response):
Builtin.instance().get_node(request.match_info["node_id"], project_id=request.match_info["project_id"])
response.set_status(204)
@Route.post(
r"/projects/{project_id}/nat/nodes/{node_id}/adapters/{adapter_number:\d+}/ports/{port_number:\d+}/nio",
parameters={
"project_id": "Project UUID",
"node_id": "Node UUID",
"adapter_number": "Adapter on the nat (always 0)",
"port_number": "Port on the nat"
},
status_codes={
201: "NIO created",
400: "Invalid request",
404: "Instance doesn't exist"
},
description="Add a NIO to a nat instance",
input=NIO_SCHEMA,
output=NIO_SCHEMA)
def create_nio(request, response):
builtin_manager = Builtin.instance()
node = builtin_manager.get_node(request.match_info["node_id"], project_id=request.match_info["project_id"])
nio = builtin_manager.create_nio(request.json)
port_number = int(request.match_info["port_number"])
yield from node.add_nio(nio, port_number)
response.set_status(201)
response.json(nio)
@Route.delete(
r"/projects/{project_id}/nat/nodes/{node_id}/adapters/{adapter_number:\d+}/ports/{port_number:\d+}/nio",
parameters={
"project_id": "Project UUID",
"node_id": "Node UUID",
"adapter_number": "Adapter on the nat (always 0)",
"port_number": "Port on the nat"
},
status_codes={
204: "NIO deleted",
400: "Invalid request",
404: "Instance doesn't exist"
},
description="Remove a NIO from a nat instance")
def delete_nio(request, response):
builtin_manager = Builtin.instance()
node = builtin_manager.get_node(request.match_info["node_id"], project_id=request.match_info["project_id"])
port_number = int(request.match_info["port_number"])
yield from node.remove_nio(port_number)
response.set_status(204)
@Route.post(
r"/projects/{project_id}/nat/nodes/{node_id}/adapters/{adapter_number:\d+}/ports/{port_number:\d+}/start_capture",
parameters={
"project_id": "Project UUID",
"node_id": "Node UUID",
"adapter_number": "Adapter on the nat (always 0)",
"port_number": "Port on the nat"
},
status_codes={
200: "Capture started",
400: "Invalid request",
404: "Instance doesn't exist"
},
description="Start a packet capture on a nat instance",
input=NODE_CAPTURE_SCHEMA)
def start_capture(request, response):
builtin_manager = Builtin.instance()
node = builtin_manager.get_node(request.match_info["node_id"], project_id=request.match_info["project_id"])
port_number = int(request.match_info["port_number"])
pcap_file_path = os.path.join(node.project.capture_working_directory(), request.json["capture_file_name"])
yield from node.start_capture(port_number, pcap_file_path, request.json["data_link_type"])
response.json({"pcap_file_path": pcap_file_path})
@Route.post(
r"/projects/{project_id}/nat/nodes/{node_id}/adapters/{adapter_number:\d+}/ports/{port_number:\d+}/stop_capture",
parameters={
"project_id": "Project UUID",
"node_id": "Node UUID",
"adapter_number": "Adapter on the nat (always 0)",
"port_number": "Port on the nat"
},
status_codes={
204: "Capture stopped",
400: "Invalid request",
404: "Instance doesn't exist"
},
description="Stop a packet capture on a nat instance")
def stop_capture(request, response):
builtin_manager = Builtin.instance()
node = builtin_manager.get_node(request.match_info["node_id"], project_id=request.match_info["project_id"])
port_number = int(request.match_info["port_number"])
yield from node.stop_capture(port_number)
response.set_status(204)

View File

@ -15,6 +15,23 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from .port import PORT_OBJECT_SCHEMA
HOST_INTERFACE_SCHEMA = {
"description": "Interfaces on this host",
"properties": {
"name": {
"description": "Interface name",
"type": "string",
"minLength": 1,
},
"type": {
"enum": ["ethernet", "tap"]
},
},
"required": ["name", "type"],
"additionalProperties": False
}
CLOUD_CREATE_SCHEMA = {
@ -22,111 +39,7 @@ CLOUD_CREATE_SCHEMA = {
"description": "Request validation to create a new cloud instance",
"type": "object",
"definitions": {
"EthernetInterfacePort": {
"description": "Ethernet interface port",
"properties": {
"name": {
"description": "Port name",
"type": "string",
"minLength": 1,
},
"port_number": {
"description": "Port number",
"type": "integer",
"minimum": 1
},
"type": {
"description": "Port type",
"enum": ["ethernet"]
},
"interface": {
"description": "Ethernet interface name e.g. eth0",
"type": "string",
"minLength": 1
},
},
"required": ["name", "port_number", "type", "interface"],
"additionalProperties": False
},
"TAPInterfacePort": {
"description": "TAP interface port",
"properties": {
"name": {
"description": "Port name",
"type": "string",
"minLength": 1,
},
"port_number": {
"description": "Port number",
"type": "integer",
"minimum": 1
},
"type": {
"description": "Port type",
"enum": ["tap"]
},
"interface": {
"description": "TAP interface name e.g. tap0",
"type": "string",
"minLength": 1
},
},
"required": ["name", "port_number", "type", "interface"],
"additionalProperties": False
},
"UDPTunnelPort": {
"description": "UDP tunnel port",
"properties": {
"name": {
"description": "Port name",
"type": "string",
"minLength": 1,
},
"port_number": {
"description": "Port number",
"type": "integer",
"minimum": 1
},
"type": {
"description": "Port type",
"enum": ["udp"]
},
"lport": {
"description": "Local UDP tunnel port",
"type": "integer",
"minimum": 1,
"maximum": 65535
},
"rhost": {
"description": "Remote UDP tunnel host",
"type": "string",
"minLength": 1
},
"rport": {
"description": "Remote UDP tunnel port",
"type": "integer",
"minimum": 1,
"maximum": 65535
}
},
"required": ["name", "port_number", "type", "lport", "rhost", "rport"],
"additionalProperties": False
},
"HostInterfaces": {
"description": "Interfaces on this host",
"properties": {
"name": {
"description": "Interface name",
"type": "string",
"minLength": 1,
},
"type": {
"enum": ["Ethernet", "TAP"]
},
},
"required": ["name", "type"],
"additionalProperties": False
},
"HostInterfaces": HOST_INTERFACE_SCHEMA
},
"properties": {
"name": {
@ -146,12 +59,7 @@ CLOUD_CREATE_SCHEMA = {
"ports": {
"type": "array",
"items": [
{"type": "object",
"oneOf": [
{"$ref": "#/definitions/EthernetInterfacePort"},
{"$ref": "#/definitions/TAPInterfacePort"},
{"$ref": "#/definitions/UDPTunnelPort"}
]},
PORT_OBJECT_SCHEMA
]
},
"interfaces": {
@ -173,111 +81,7 @@ CLOUD_OBJECT_SCHEMA = {
"description": "Cloud instance",
"type": "object",
"definitions": {
"EthernetInterfacePort": {
"description": "Ethernet interface port",
"properties": {
"name": {
"description": "Port name",
"type": "string",
"minLength": 1,
},
"port_number": {
"description": "Port number",
"type": "integer",
"minimum": 1
},
"type": {
"description": "Port type",
"enum": ["ethernet"]
},
"interface": {
"description": "Ethernet interface name e.g. eth0",
"type": "string",
"minLength": 1
},
},
"required": ["name", "port_number", "type", "interface"],
"additionalProperties": False
},
"TAPInterfacePort": {
"description": "TAP interface port",
"properties": {
"name": {
"description": "Port name",
"type": "string",
"minLength": 1,
},
"port_number": {
"description": "Port number",
"type": "integer",
"minimum": 1
},
"type": {
"description": "Port type",
"enum": ["tap"]
},
"interface": {
"description": "TAP interface name e.g. tap0",
"type": "string",
"minLength": 1
},
},
"required": ["name", "port_number", "type", "interface"],
"additionalProperties": False
},
"UDPTunnelPort": {
"description": "UDP tunnel port",
"properties": {
"name": {
"description": "Port name",
"type": "string",
"minLength": 1,
},
"port_number": {
"description": "Port number",
"type": "integer",
"minimum": 1
},
"type": {
"description": "Port type",
"enum": ["udp"]
},
"lport": {
"description": "Local UDP tunnel port",
"type": "integer",
"minimum": 1,
"maximum": 65535
},
"rhost": {
"description": "Remote UDP tunnel host",
"type": "string",
"minLength": 1
},
"rport": {
"description": "Remote UDP tunnel port",
"type": "integer",
"minimum": 1,
"maximum": 65535
}
},
"required": ["name", "port_number", "type", "lport", "rhost", "rport"],
"additionalProperties": False
},
"HostInterfaces": {
"description": "Interfaces on this host",
"properties": {
"name": {
"description": "Interface name",
"type": "string",
"minLength": 1,
},
"type": {
"enum": ["ethernet", "tap"]
},
},
"required": ["name", "type"],
"additionalProperties": False
},
"HostInterfaces": HOST_INTERFACE_SCHEMA
},
"properties": {
"name": {
@ -302,12 +106,7 @@ CLOUD_OBJECT_SCHEMA = {
"ports": {
"type": "array",
"items": [
{"type": "object",
"oneOf": [
{"$ref": "#/definitions/EthernetInterfacePort"},
{"$ref": "#/definitions/TAPInterfacePort"},
{"$ref": "#/definitions/UDPTunnelPort"}
]},
PORT_OBJECT_SCHEMA
]
},
"interfaces": {

65
gns3server/schemas/nat.py Normal file
View File

@ -0,0 +1,65 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2016 GNS3 Technologies Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from .port import PORT_OBJECT_SCHEMA
NAT_OBJECT_SCHEMA = {
"$schema": "http://json-schema.org/draft-04/schema#",
"description": "Nat instance",
"type": "object",
"properties": {
"name": {
"description": "Nat name",
"type": "string",
"minLength": 1,
},
"node_id": {
"description": "Node UUID",
"type": "string",
"minLength": 36,
"maxLength": 36,
"pattern": "^[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}$"
},
"project_id": {
"description": "Project UUID",
"type": "string",
"minLength": 36,
"maxLength": 36,
"pattern": "^[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}$"
},
"status": {
"description": "Node status",
"enum": ["started", "stopped", "suspended"]
},
"ports": {
"type": "array",
"items": [
PORT_OBJECT_SCHEMA
]
},
},
"additionalProperties": False,
"required": ["name", "node_id", "project_id"]
}
NAT_CREATE_SCHEMA = NAT_OBJECT_SCHEMA
NAT_CREATE_SCHEMA["required"] = ["name"]
NAT_UPDATE_SCHEMA = NAT_OBJECT_SCHEMA
del NAT_UPDATE_SCHEMA["required"]

View File

@ -90,6 +90,7 @@ NODE_OBJECT_SCHEMA = {
"node_type": {
"description": "Type of node",
"enum": ["cloud",
"nat",
"ethernet_hub",
"ethernet_switch",
"frame_relay_switch",

115
gns3server/schemas/port.py Normal file
View File

@ -0,0 +1,115 @@
#!/usr/bin/env python
#
# Copyright (C) 2016 GNS3 Technologies Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
PORT_OBJECT_SCHEMA = {
"$schema": "http://json-schema.org/draft-04/schema#",
"description": "A port use in the cloud",
"type": "object",
"oneOf": [
{
"description": "Ethernet interface port",
"properties": {
"name": {
"description": "Port name",
"type": "string",
"minLength": 1,
},
"port_number": {
"description": "Port number",
"type": "integer",
"minimum": 1
},
"type": {
"description": "Port type",
"enum": ["ethernet"]
},
"interface": {
"description": "Ethernet interface name e.g. eth0",
"type": "string",
"minLength": 1
},
},
"required": ["name", "port_number", "type", "interface"],
"additionalProperties": False
},
{
"description": "TAP interface port",
"properties": {
"name": {
"description": "Port name",
"type": "string",
"minLength": 1,
},
"port_number": {
"description": "Port number",
"type": "integer",
"minimum": 1
},
"type": {
"description": "Port type",
"enum": ["tap"]
},
"interface": {
"description": "TAP interface name e.g. tap0",
"type": "string",
"minLength": 1
},
},
"required": ["name", "port_number", "type", "interface"],
"additionalProperties": False
},
{
"description": "UDP tunnel port",
"properties": {
"name": {
"description": "Port name",
"type": "string",
"minLength": 1,
},
"port_number": {
"description": "Port number",
"type": "integer",
"minimum": 1
},
"type": {
"description": "Port type",
"enum": ["udp"]
},
"lport": {
"description": "Local UDP tunnel port",
"type": "integer",
"minimum": 1,
"maximum": 65535
},
"rhost": {
"description": "Remote UDP tunnel host",
"type": "string",
"minLength": 1
},
"rport": {
"description": "Remote UDP tunnel port",
"type": "integer",
"minimum": 1,
"maximum": 65535
}
},
"required": ["name", "port_number", "type", "lport", "rhost", "rport"],
"additionalProperties": False
}
]
}

View File

@ -0,0 +1,40 @@
#!/usr/bin/env python
#
# Copyright (C) 2016 GNS3 Technologies Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import uuid
import pytest
from unittest.mock import MagicMock
from gns3server.compute.builtin.nodes.nat import Nat
def test_json(on_gns3vm, project):
nat = Nat("nat1", str(uuid.uuid4()), project, MagicMock())
assert nat.__json__() == {
"name": "nat1",
"node_id": nat.id,
"project_id": project.id,
"status": "started",
"ports": [
{
"interface": "eth1",
"name": "nat0",
"port_number": 1,
"type": "ethernet"
}
]
}

View File

@ -315,3 +315,12 @@ def async_run(loop):
Shortcut for running in asyncio loop
"""
return lambda x: loop.run_until_complete(asyncio.async(x))
@pytest.yield_fixture
def on_gns3vm():
"""
Mock the hostname to emulate the GNS3 VM
"""
with patch("socket.gethostname", return_value="gns3vm"):
yield

View File

@ -0,0 +1,92 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2015 GNS3 Technologies Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import pytest
import sys
import os
from tests.utils import asyncio_patch
from unittest.mock import patch
@pytest.fixture(scope="function")
def vm(http_compute, project):
response = http_compute.post("/projects/{project_id}/cloud/nodes".format(project_id=project.id), {"name": "Cloud 1"})
assert response.status == 201
return response.json
@pytest.yield_fixture(autouse=True)
def mock_ubridge():
"""
Avoid all interaction with ubridge
"""
with asyncio_patch("gns3server.compute.builtin.nodes.cloud.Cloud._start_ubridge"):
with asyncio_patch("gns3server.compute.builtin.nodes.cloud.Cloud._add_ubridge_connection"):
with asyncio_patch("gns3server.compute.builtin.nodes.cloud.Cloud._delete_ubridge_connection"):
yield
def test_cloud_create(http_compute, project):
response = http_compute.post("/projects/{project_id}/cloud/nodes".format(project_id=project.id), {"name": "Cloud 1"}, example=True)
assert response.status == 201
assert response.route == "/projects/{project_id}/cloud/nodes"
assert response.json["name"] == "Cloud 1"
assert response.json["project_id"] == project.id
def test_cloud_get(http_compute, project, vm):
response = http_compute.get("/projects/{project_id}/cloud/nodes/{node_id}".format(project_id=vm["project_id"], node_id=vm["node_id"]), example=True)
assert response.status == 200
assert response.route == "/projects/{project_id}/cloud/nodes/{node_id}"
assert response.json["name"] == "Cloud 1"
assert response.json["project_id"] == project.id
assert response.json["status"] == "started"
def test_cloud_nio_create_udp(http_compute, vm):
response = http_compute.post("/projects/{project_id}/cloud/nodes/{node_id}/adapters/0/ports/0/nio".format(project_id=vm["project_id"], node_id=vm["node_id"]), {"type": "nio_udp",
"lport": 4242,
"rport": 4343,
"rhost": "127.0.0.1"},
example=True)
assert response.status == 201
assert response.route == "/projects/{project_id}/cloud/nodes/{node_id}/adapters/{adapter_number:\d+}/ports/{port_number:\d+}/nio"
assert response.json["type"] == "nio_udp"
def test_cloud_delete_nio(http_compute, vm):
http_compute.post("/projects/{project_id}/cloud/nodes/{node_id}/adapters/0/ports/0/nio".format(project_id=vm["project_id"], node_id=vm["node_id"]), {"type": "nio_udp",
"lport": 4242,
"rport": 4343,
"rhost": "127.0.0.1"})
response = http_compute.delete("/projects/{project_id}/cloud/nodes/{node_id}/adapters/0/ports/0/nio".format(project_id=vm["project_id"], node_id=vm["node_id"]), example=True)
assert response.status == 204
assert response.route == "/projects/{project_id}/cloud/nodes/{node_id}/adapters/{adapter_number:\d+}/ports/{port_number:\d+}/nio"
def test_cloud_delete(http_compute, vm):
response = http_compute.delete("/projects/{project_id}/cloud/nodes/{node_id}".format(project_id=vm["project_id"], node_id=vm["node_id"]), example=True)
assert response.status == 204
def test_cloud_update(http_compute, vm, tmpdir):
response = http_compute.put("/projects/{project_id}/cloud/nodes/{node_id}".format(project_id=vm["project_id"], node_id=vm["node_id"]), {
"name": "test"
},
example=True)
assert response.status == 200
assert response.json["name"] == "test"

View File

@ -0,0 +1,92 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2015 GNS3 Technologies Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import pytest
import sys
import os
from tests.utils import asyncio_patch
from unittest.mock import patch
@pytest.fixture(scope="function")
def vm(http_compute, project, on_gns3vm):
response = http_compute.post("/projects/{project_id}/nat/nodes".format(project_id=project.id), {"name": "Nat 1"})
assert response.status == 201
return response.json
@pytest.yield_fixture(autouse=True)
def mock_ubridge():
"""
Avoid all interaction with ubridge
"""
with asyncio_patch("gns3server.compute.builtin.nodes.nat.Nat._start_ubridge"):
with asyncio_patch("gns3server.compute.builtin.nodes.nat.Nat._add_ubridge_connection"):
with asyncio_patch("gns3server.compute.builtin.nodes.nat.Nat._delete_ubridge_connection"):
yield
def test_nat_create(http_compute, project, on_gns3vm):
response = http_compute.post("/projects/{project_id}/nat/nodes".format(project_id=project.id), {"name": "Nat 1"}, example=True)
assert response.status == 201
assert response.route == "/projects/{project_id}/nat/nodes"
assert response.json["name"] == "Nat 1"
assert response.json["project_id"] == project.id
def test_nat_get(http_compute, project, vm):
response = http_compute.get("/projects/{project_id}/nat/nodes/{node_id}".format(project_id=vm["project_id"], node_id=vm["node_id"]), example=True)
assert response.status == 200
assert response.route == "/projects/{project_id}/nat/nodes/{node_id}"
assert response.json["name"] == "Nat 1"
assert response.json["project_id"] == project.id
assert response.json["status"] == "started"
def test_nat_nio_create_udp(http_compute, vm):
response = http_compute.post("/projects/{project_id}/nat/nodes/{node_id}/adapters/0/ports/0/nio".format(project_id=vm["project_id"], node_id=vm["node_id"]), {"type": "nio_udp",
"lport": 4242,
"rport": 4343,
"rhost": "127.0.0.1"},
example=True)
assert response.status == 201
assert response.route == "/projects/{project_id}/nat/nodes/{node_id}/adapters/{adapter_number:\d+}/ports/{port_number:\d+}/nio"
assert response.json["type"] == "nio_udp"
def test_nat_delete_nio(http_compute, vm):
http_compute.post("/projects/{project_id}/nat/nodes/{node_id}/adapters/0/ports/0/nio".format(project_id=vm["project_id"], node_id=vm["node_id"]), {"type": "nio_udp",
"lport": 4242,
"rport": 4343,
"rhost": "127.0.0.1"})
response = http_compute.delete("/projects/{project_id}/nat/nodes/{node_id}/adapters/0/ports/0/nio".format(project_id=vm["project_id"], node_id=vm["node_id"]), example=True)
assert response.status == 204
assert response.route == "/projects/{project_id}/nat/nodes/{node_id}/adapters/{adapter_number:\d+}/ports/{port_number:\d+}/nio"
def test_nat_delete(http_compute, vm):
response = http_compute.delete("/projects/{project_id}/nat/nodes/{node_id}".format(project_id=vm["project_id"], node_id=vm["node_id"]), example=True)
assert response.status == 204
def test_nat_update(http_compute, vm, tmpdir):
response = http_compute.put("/projects/{project_id}/nat/nodes/{node_id}".format(project_id=vm["project_id"], node_id=vm["node_id"]), {
"name": "test"
},
example=True)
assert response.status == 200
assert response.json["name"] == "test"