Ensure we can't connect to occupy port

Ref https://github.com/GNS3/gns3-gui/issues/1759
This commit is contained in:
Julien Duponchelle 2017-01-06 10:29:56 +01:00
parent 51aef2b9c7
commit 955a466be5
No known key found for this signature in database
GPG Key ID: CE8B29639E07F5E8
3 changed files with 47 additions and 2 deletions

View File

@ -58,6 +58,9 @@ class Link:
"""
port = node.get_port(adapter_number, port_number)
if port.link is not None:
raise aiohttp.web.HTTPConflict(text="Port is already used")
self._link_type = port.link_type
for other_node in self._nodes:
@ -86,6 +89,7 @@ class Link:
"node": node,
"adapter_number": adapter_number,
"port_number": port_number,
"port": port,
"label": label
})
@ -93,6 +97,7 @@ class Link:
yield from self.create()
for n in self._nodes:
n["node"].add_link(self)
n["port"].link = self
self._created = True
self._project.controller.notification.emit("link.created", self.__json__())
@ -123,8 +128,11 @@ class Link:
"""
Delete the link
"""
for port in self._nodes:
port["node"].remove_link(self)
for n in self._nodes:
# It could be different of self if we rollback an already existing link
if n["port"].link == self:
n["port"].link = None
n["node"].remove_link(self)
@asyncio.coroutine
def start_capture(self, data_link_type="DLT_EN10MB", capture_file_name=None):

View File

@ -26,6 +26,18 @@ class Port:
self._adapter_number = adapter_number
self._port_number = port_number
self._name = name
self._link = None
@property
def link(self):
"""
Link connected to the port
"""
return self._link
@link.setter
def link(self, val):
self._link = val
@property
def adapter_number(self):

View File

@ -74,6 +74,7 @@ def test_add_node(async_run, project, compute):
assert link._nodes == [
{
"node": node1,
"port": node1._ports[0],
"adapter_number": 0,
"port_number": 4,
'label': {
@ -100,6 +101,30 @@ def test_add_node(async_run, project, compute):
assert link in node2.link
def test_add_node_already_connected(async_run, project, compute):
"""
Raise an error if we try to use an already connected port
"""
project.dump = AsyncioMagicMock()
node1 = Node(project, compute, "node1", node_type="qemu")
node1._ports = [EthernetPort("E0", 0, 0, 4)]
link = Link(project)
link.create = AsyncioMagicMock()
link._project.controller.notification.emit = MagicMock()
async_run(link.add_node(node1, 0, 4))
node2 = Node(project, compute, "node2", node_type="qemu")
node2._ports = [EthernetPort("E0", 0, 0, 4)]
async_run(link.add_node(node2, 0, 4))
assert link.create.called
link2 = Link(project)
link2.create = AsyncioMagicMock()
with pytest.raises(aiohttp.web.HTTPConflict):
async_run(link2.add_node(node1, 0, 4))
def test_add_node_cloud(async_run, project, compute):
node1 = Node(project, compute, "node1", node_type="qemu")
node1._ports = [EthernetPort("E0", 0, 0, 4)]