Base server complete including modules, STOMP protocol over Websockets

and ZMQ Router/Dealer implementations.
This commit is contained in:
grossmj 2013-12-05 00:21:06 -07:00
parent 58f93edaf7
commit f4e51ea74f
20 changed files with 1153 additions and 170 deletions

View File

@ -1,9 +1,7 @@
language: python
python:
- "2.6"
- "2.7"
- "pypy"
- "3.3"
install:

4
dev-requirements.txt Normal file
View File

@ -0,0 +1,4 @@
-rrequirements.txt
pytest
ws4py

View File

@ -23,8 +23,6 @@
# or negative for a release candidate or beta (after the base version
# number has been incremented)
from gns3server.plugin_manager import PluginManager
from gns3server.server import Server
__version__ = "0.1.dev"
__version_info__ = (0, 1, 0, -99)
from .module_manager import ModuleManager
from .server import Server
from .version import __version__

View File

@ -24,10 +24,15 @@ import tornado.options
# command line options
from tornado.options import define
define("host", default="127.0.0.1", help="run on the given host/IP address", type=str)
define("port", default=8000, help="run on the given port", type=int)
define("ipc", default=False, help="use IPC for module communication", type=bool)
def main():
"""
Entry point for GNS3 server
"""
current_year = datetime.date.today().year
print("GNS3 server version {}".format(gns3server.__version__))
@ -45,13 +50,15 @@ def main():
tornado.options.print_help()
raise SystemExit
#FIXME: log everything for now (excepting DEBUG)
# FIXME: log everything for now (excepting DEBUG)
logging.basicConfig(level=logging.INFO)
server = gns3server.Server()
server.load_plugins()
from tornado.options import options
server = gns3server.Server(options.host,
options.port,
ipc=options.ipc)
server.load_modules()
server.run()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,114 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2013 GNS3 Technologies Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import imp
import inspect
import pkgutil
from .modules import IModule
import logging
log = logging.getLogger(__name__)
class Module(object):
"""
Module representation for the module manager
:param name: module name
:param cls: module class to be instantiated when
the module is activated
"""
def __init__(self, name, cls):
self._name = name
self._cls = cls
@property
def name(self):
return self._name
@name.setter
def name(self, new_name):
self._name = new_name
#@property
def cls(self):
return self._cls
class ModuleManager(object):
"""
Manages modules
:param module_paths: path from where module are loaded
"""
def __init__(self, module_paths=['modules']):
self._modules = []
self._module_paths = module_paths
def load_modules(self):
"""
Finds all the possible modules (classes with IModule as a parent)
"""
for _, name, ispkg in pkgutil.iter_modules(self._module_paths):
if (ispkg):
log.debug("analyzing {} package directory".format(name))
try:
file, pathname, description = imp.find_module(name, self._module_paths)
module = imp.load_module(name, file, pathname, description)
classes = inspect.getmembers(module, inspect.isclass)
for module_class in classes:
if issubclass(module_class[1], IModule):
# make sure the module class has IModule as a parent
if module_class[1].__module__ == name:
log.info("found and loading {} module".format(module_class[0].lower()))
info = Module(name=module_class[0].lower(), cls=module_class[1])
self._modules.append(info)
except:
log.warning("error while analyzing {} package directory".format(name))
finally:
if file:
file.close()
def get_all_modules(self):
"""
Returns all modules.
:return: list of Module objects
"""
return self._modules
def activate_module(self, module, args=(), kwargs={}):
"""
Activates a given module.
:param module: module to activate (Module object)
:param args: args passed to the module
:param kwargs: kwargs passed to the module
:return: instantiated module class
"""
module_class = module.cls()
module_instance = module_class(name=module.name, args=args, kwargs={})
log.info("activating {} module".format(module.name))
return module_instance

View File

@ -15,4 +15,4 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from gns3server.plugins.base import IPlugin
from .base import IModule

149
gns3server/modules/base.py Normal file
View File

@ -0,0 +1,149 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2013 GNS3 Technologies Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import multiprocessing
import zmq
import logging
log = logging.getLogger(__name__)
class IModule(multiprocessing.Process):
"""
Module interface
"""
destination = {}
def __init__(self, name=None, args=(), kwargs={}):
multiprocessing.Process.__init__(self,
name=name,
args=args,
kwargs=kwargs)
self._context = None
self._ioloop = None
self._stream = None
self._host = args[0]
self._port = args[1]
self._current_session = None
self._current_destination = None
def setup(self):
"""
Sets up PyZMQ and creates the stream to handle requests
"""
self._context = zmq.Context()
self._ioloop = zmq.eventloop.ioloop.IOLoop.instance()
self._stream = self.create_stream(self._host, self._port, self.decode_request)
def create_stream(self, host=None, port=0, callback=None):
"""
Creates a new ZMQ stream
"""
socket = self._context.socket(zmq.DEALER)
socket.setsockopt(zmq.IDENTITY, self.name.encode("utf-8"))
if host and port:
log.info("ZeroMQ client ({}) connecting to {}:{}".format(self.name, host, port))
try:
socket.connect("tcp://{}:{}".format(host, port))
except zmq.error.ZMQError as e:
log.critical("Could not connect to ZeroMQ server on {}:{}, reason: {}".format(host, port, e))
raise SystemExit
else:
log.info("ZeroMQ client ({}) connecting to ipc:///tmp/gns3.ipc".format(self.name))
try:
socket.connect("ipc:///tmp/gns3.ipc")
except zmq.error.ZMQError as e:
log.critical("Could not connect to ZeroMQ server on ipc:///tmp/gns3.ipc, reason: {}".format(e))
raise SystemExit
stream = zmq.eventloop.zmqstream.ZMQStream(socket, self._ioloop)
if callback:
stream.on_recv(callback)
return stream
def run(self):
"""
Sets up everything and starts the event loop
"""
self.setup()
try:
self._ioloop.start()
except KeyboardInterrupt:
return
def stop(self):
"""
Stops the event loop
"""
#zmq.eventloop.ioloop.IOLoop.instance().stop()
self._ioloop.stop()
def send_response(self, response):
"""
Sends a response back to the requester
"""
# add session and destination to the response
response = [self._current_session, self._current_destination, response]
log.debug("ZeroMQ client ({}) sending: {}".format(self.name, response))
self._stream.send_json(response)
def decode_request(self, request):
"""
Decodes the request to JSON
"""
try:
request = zmq.utils.jsonapi.loads(request[0])
except ValueError:
self.send_response("ValueError") # FIXME: explicit json error
return
log.debug("ZeroMQ client ({}) received: {}".format(self.name, request))
self._current_session = request[0]
self._current_destination = request[1]
if self._current_destination not in self.destination:
# FIXME: return error if destination not found!
return
log.debug("Routing request to {}: {}".format(self._current_destination, request[2]))
self.destination[self._current_destination](self, request[2])
def destinations(self):
"""
Channels handled by this modules.
"""
return self.destination.keys()
@classmethod
def route(cls, destination):
"""
Decorator to register a destination routed to a method
"""
def wrapper(method):
cls.destination[destination] = method
return method
return wrapper

View File

@ -1,87 +0,0 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2013 GNS3 Technologies Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import imp
import inspect
import pkgutil
import logging
from gns3server.plugins import IPlugin
logger = logging.getLogger(__name__)
class Plugin(object):
"""Plugin representation for the PluginManager
"""
def __init__(self, name, cls):
self._name = name
self._cls = cls
@property
def name(self):
return self._name
@name.setter
def name(self, new_name):
self._name = new_name
#@property
def cls(self):
return self._cls
class PluginManager(object):
"""Manages plugins
"""
def __init__(self, plugin_paths=['plugins']):
self._plugins = []
self._plugin_paths = plugin_paths
def load_plugins(self):
for _, name, ispkg in pkgutil.iter_modules(self._plugin_paths):
if (ispkg):
logger.info("analyzing '{}' package".format(name))
try:
file, pathname, description = imp.find_module(name, self._plugin_paths)
plugin_module = imp.load_module(name, file, pathname, description)
plugin_classes = inspect.getmembers(plugin_module, inspect.isclass)
for plugin_class in plugin_classes:
if issubclass(plugin_class[1], IPlugin):
# don't instantiate any parent plugins
if plugin_class[1].__module__ == name:
logger.info("loading '{}' plugin".format(plugin_class[0]))
info = Plugin(name=plugin_class[0], cls=plugin_class[1])
self._plugins.append(info)
finally:
if file:
file.close()
def get_all_plugins(self):
return self._plugins
def activate_plugin(self, plugin):
plugin_class = plugin.cls()
plugin_instance = plugin_class()
logger.info("'{}' plugin activated".format(plugin.name))
return plugin_instance

View File

@ -15,66 +15,142 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
import zmq
from zmq.eventloop import ioloop, zmqstream
ioloop.install()
import os
import functools
import socket
import tornado.ioloop
import tornado.web
import gns3server
import tornado.autoreload
from .version import __version__
from .stomp_websocket import StompWebSocket
from .module_manager import ModuleManager
logger = logging.getLogger(__name__)
class MainHandler(tornado.web.RequestHandler):
def get(self):
self.write("Welcome to the GNS3 server!")
import logging
log = logging.getLogger(__name__)
class VersionHandler(tornado.web.RequestHandler):
def get(self):
response = {'version': gns3server.__version__}
response = {'version': __version__}
self.write(response)
class Server(object):
# built-in handlers
handlers = [(r"/", MainHandler),
(r"/version", VersionHandler)]
handlers = [(r"/version", VersionHandler)]
def __init__(self):
def __init__(self, host, port, ipc=False):
self._plugins = []
self._host = host
self._port = port
if ipc:
self._zmq_port = 0 # this forces module to use IPC for communications
else:
self._zmq_port = port + 1 # this server port + 1
self._ipc = ipc
self._modules = []
def load_plugins(self):
def load_modules(self):
"""Loads the plugins
"""
plugin_manager = gns3server.PluginManager()
plugin_manager.load_plugins()
for plugin in plugin_manager.get_all_plugins():
instance = plugin_manager.activate_plugin(plugin)
self._plugins.append(instance)
plugin_handlers = instance.handlers()
self.handlers.extend(plugin_handlers)
cwd = os.path.dirname(os.path.abspath(__file__))
module_path = os.path.join(cwd, 'modules')
log.info("loading modules from {}".format(module_path))
module_manager = ModuleManager([module_path])
module_manager.load_modules()
for module in module_manager.get_all_modules():
instance = module_manager.activate_module(module, ("127.0.0.1", self._zmq_port))
self._modules.append(instance)
destinations = instance.destinations()
for destination in destinations:
StompWebSocket.register_destination(destination, module.name)
instance.start() # starts the new process
def run(self):
"""Starts the tornado web server
"""
Starts the Tornado web server and ZeroMQ server
"""
from tornado.options import options
tornado_app = tornado.web.Application(self.handlers)
router = self._create_zmq_router()
# Add our Stomp Websocket handler to Tornado
self.handlers.extend([(r"/", StompWebSocket, dict(zmq_router=router))])
tornado_app = tornado.web.Application(self.handlers, debug=True) # FIXME: debug mode!
try:
port = options.port
print("Starting server on port {}".format(port))
tornado_app.listen(port)
print("Starting server on port {}".format(self._port))
tornado_app.listen(self._port)
except socket.error as e:
if e.errno is 48: # socket already in use
logging.critical("socket in use for port {}".format(port))
logging.critical("socket in use for port {}".format(self._port))
raise SystemExit
ioloop = tornado.ioloop.IOLoop.instance()
stream = zmqstream.ZMQStream(router, ioloop)
stream.on_recv(StompWebSocket.dispatch_message)
tornado.autoreload.add_reload_hook(functools.partial(self._cleanup, stop=False))
try:
tornado.ioloop.IOLoop.instance().start()
ioloop.start()
except (KeyboardInterrupt, SystemExit):
print("\nExiting...")
tornado.ioloop.IOLoop.instance().stop()
self._cleanup()
def _create_zmq_router(self):
"""
Creates the ZeroMQ router socket to send
requests to modules.
:returns: ZeroMQ socket
"""
context = zmq.Context()
context.linger = 0
router = context.socket(zmq.ROUTER)
if self._ipc:
try:
router.bind("ipc:///tmp/gns3.ipc")
except zmq.error.ZMQError as e:
log.critical("Could not start ZeroMQ server on ipc:///tmp/gns3.ipc, reason: {}".format(e))
self._cleanup()
raise SystemExit
log.info("ZeroMQ server listening to ipc:///tmp/gns3.ipc")
else:
try:
router.bind("tcp://127.0.0.1:{}".format(self._zmq_port))
except zmq.error.ZMQError as e:
log.critical("Could not start ZeroMQ server on 127.0.0.1:{}, reason: {}".format(self._zmq_port, e))
self._cleanup()
raise SystemExit
log.info("ZeroMQ server listening to 127.0.0.1:{}".format(self._zmq_port))
return router
def _cleanup(self, stop=True):
"""
Shutdowns running module processes
and close remaining Tornado ioloop file descriptors
:param stop: Stop the ioloop if True (default)
"""
# terminate all modules
for module in self._modules:
log.info("terminating {}".format(module.name))
module.terminate()
module.join(timeout=1)
ioloop = tornado.ioloop.IOLoop.instance()
# close any fd that would have remained open...
for fd in ioloop._handlers.keys():
try:
os.close(fd)
except Exception:
pass
if stop:
ioloop.stop()

159
gns3server/stomp/frame.py Normal file
View File

@ -0,0 +1,159 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2013 GNS3 Technologies Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
STOMP frame representation, decoding and encoding
http://stomp.github.io/stomp-specification-1.2.html
Adapted from Jason R. Briggs's code
https://github.com/jasonrbriggs/stomp.py
"""
import re
from .utils import encode
class Frame(object):
"""
A STOMP frame. Comprises a command, the headers and the body content.
"""
# Used to parse STOMP header lines in the format "key:value",
HEADER_LINE_RE = re.compile('(?P<key>[^:]+)[:](?P<value>.*)')
# As of STOMP 1.2, lines can end with either line feed, or carriage return plus line feed.
PREAMBLE_END_RE = re.compile('\n\n|\r\n\r\n')
# As of STOMP 1.2, lines can end with either line feed, or carriage return plus line feed.
LINE_END_RE = re.compile('\n|\r\n')
# NULL value
NULL = b'\x00'
def __init__(self, cmd=None, headers={}, body=None):
self._cmd = cmd
self._headers = headers
self._body = body
@property
def cmd(self):
return(self._cmd)
@cmd.setter
def cmd(self, cmd):
self._cmd = cmd
@property
def headers(self):
return(self._headers)
@headers.setter
def headers(self, headers):
self._headers = headers
@property
def body(self):
return(self._body)
@body.setter
def body(self, body):
self._body = body
def encode(self):
"""
Encodes this frame to be send on the wire
"""
lines = []
if self._cmd:
lines.append(self._cmd)
lines.append("\n")
for key, vals in sorted(self._headers.items()):
if type(vals) != tuple:
vals = (vals,)
for val in vals:
lines.append("%s:%s\n" % (key, val))
lines.append("\n")
if self._body:
lines.append(self._body)
if self._cmd:
lines.append(self.NULL)
encoded_lines = (encode(line) for line in lines)
return b''.join(encoded_lines)
@classmethod
def parse_headers(cls, lines, offset=0):
"""
Parses frame headers
:param lines: Frame preamble lines
:param offset: To start parsing at the given offset
:returns: Headers in dict header:value
"""
headers = {}
for header_line in lines[offset:]:
header_match = cls.HEADER_LINE_RE.match(header_line)
if header_match:
key = header_match.group('key')
if key not in headers:
headers[key] = header_match.group('value')
return headers
@classmethod
def parse_frame(cls, frame):
"""
Parses a frame
:params frame: The frame data to be parsed
:returns: STOMP Frame object
"""
f = Frame()
# End-of-line (EOL) indicates an heart beat frame
if frame == '\x0a':
f.cmd = 'heartbeat' # This will have the frame ignored
return f
mat = cls.PREAMBLE_END_RE.search(frame)
preamble_end = -1
if mat:
preamble_end = mat.start()
if preamble_end == -1:
preamble_end = len(frame)
preamble = frame[0:preamble_end]
preamble_lines = cls.LINE_END_RE.split(preamble)
f.body = frame[preamble_end + 2:]
if f.body[-1] == '\x00':
f.body = f.body[:-1]
# Skip any leading newlines
first_line = 0
while first_line < len(preamble_lines) and len(preamble_lines[first_line]) == 0:
first_line += 1
# Extract frame type/command
f.cmd = preamble_lines[first_line]
# Put headers into a key/value map
f.headers = cls.parse_headers(preamble_lines, first_line + 1)
return f

View File

@ -0,0 +1,227 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2013 GNS3 Technologies Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Basic STOMP 1.2 protocol implementation
http://stomp.github.io/stomp-specification-1.2.html
"""
import uuid
from .frame import Frame
from .utils import encode, hasbyte
# Commands server-side
CMD_CONNECTED = 'CONNECTED'
CMD_ERROR = 'ERROR'
CMD_MESSAGE = 'MESSAGE'
CMD_RECEIPT = 'RECEIPT'
# Commands client-side
CMD_STOMP = 'STOMP'
CMD_CONNECT = 'CONNECT'
CMD_DISCONNECT = 'DISCONNECT'
CMD_SEND = 'SEND'
# Commands not supported
CMD_SUBSCRIBE = 'SUBSCRIBE'
CMD_UNSUBSCRIBE = 'UNSUBSCRIBE'
CMD_ACK = 'ACK'
CMD_NACK = 'NACK'
CMD_BEGIN = 'BEGIN'
CMD_ABORT = 'ABORT'
# Headers
HDR_VERSION = 'version'
HDR_SESSION = 'session'
HDR_SERVER = 'server'
HDR_CONTENT_TYPE = 'content-type'
HDR_CONTENT_LENGTH = 'content-length'
HDR_RECEIPT_ID = 'receipt-id'
HDR_MESSAGE = 'message'
HDR_MESSAGE_ID = 'message-id'
HDR_ACCEPT_VERSION = 'accept-version'
HDR_HOST = 'host'
HDR_DESTINATION = 'destination'
HDR_RECEIPT = 'receipt'
# Headers not supported
HDR_HEARTBEAT = 'heart-beat'
HDR_LOGIN = 'login'
HDR_PASSCODE = 'passcode'
HDR_ID = 'id'
HDR_ACK = 'ack'
HDR_SUBSCRIPTION = 'subscription'
HDR_TRANSACTION = 'transaction'
class serverProtocol(object):
"""
STOMP 1.2 protocol support for servers.
"""
def __init__(self):
# STOMP protocol version
self.version = 1.2
def connected(self, session=None, server=None):
"""
Replies to the CONNECT or STOMP command.
Heart-beat header is not supported.
:param session: A session identifier that uniquely identifies the session.
:param server: A field that contains information about the STOMP server.
:returns: STOMP Frame object
"""
# Version header is required
headers = {HDR_VERSION: self.version}
if session:
headers[HDR_SESSION] = session
# The server-name field consists of a name token followed by an
# optional version number token. Example: Apache/1.3.9
if server:
headers[HDR_SERVER] = server
return Frame(CMD_CONNECTED, headers).encode()
def message(self, destination, body, content_type=None, message_id=str(uuid.uuid4())):
"""
Sends a message to a STOMP client.
:param destination: Destination string
:param body: Data to be added in the frame body
:param content_type: MIME type which describes the format of the body
:param message_id: Unique identifier for that message
:returns: STOMP Frame object
"""
# Destination and message id headers are required
headers = {HDR_DESTINATION: destination,
HDR_MESSAGE_ID: message_id}
# Subscription is required but not implemented on this server
headers[HDR_SUBSCRIPTION] = 0
if content_type:
headers[HDR_CONTENT_TYPE] = content_type
body = encode(body)
if HDR_CONTENT_LENGTH not in headers and hasbyte(0, body):
headers[HDR_CONTENT_LENGTH] = len(body)
return Frame(CMD_MESSAGE, headers, body).encode()
def receipt(self, receipt_id):
"""
Sends an acknowledgment for client frame that requests a receipt.
:param receipt_id: Receipt ID to send back to the client
:returns: STOMP Frame object
"""
# Receipt ID header is required (the same sent in the client frame)
headers = {HDR_RECEIPT_ID: receipt_id}
return Frame(CMD_RECEIPT, headers).encode()
def error(self, message='', body='', content_type=None):
"""
Sends an error to the client if something goes wrong.
:param message: Short description of the error
:param body: Detailed information
:param content_type: MIME type which describes the format of the body
:returns: STOMP Frame object
"""
headers = {}
if message:
headers[HDR_MESSAGE] = message
if body:
body = encode(body)
if HDR_CONTENT_LENGTH not in headers and hasbyte(0, body):
headers[HDR_CONTENT_LENGTH] = len(body)
if content_type:
headers[HDR_CONTENT_TYPE] = content_type
return Frame(CMD_ERROR, headers, body).encode()
class clientProtocol(object):
"""
STOMP 1.2 protocol support for clients.
"""
def connect(self, host, accept_version='1.2'):
"""
Connects to a STOMP server.
Heart-beat, login and passcode headers are not supported.
:param host: Host name that the socket was established against.
:param accept_version: The versions of the STOMP protocol the client supports.
:returns: STOMP Frame object
"""
# Currently only STOMP 1.2 is supported (required header)
headers = {HDR_ACCEPT_VERSION: accept_version}
if host:
headers[HDR_HOST] = host
# The STOMP command is not backward compatible with STOMP 1.0 servers.
# Clients that use the STOMP frame instead of the CONNECT frame will
# only be able to connect to STOMP 1.2 servers (as well as some STOMP 1.1 servers.
return Frame(CMD_STOMP, headers).encode()
def disconnect(self, receipt=str(uuid.uuid4())):
"""
Disconnects to a STOMP server.
:param receipt: unique identifier
:returns: STOMP Frame object
"""
# Receipt header is required
headers = {HDR_RECEIPT: receipt}
return Frame(CMD_DISCONNECT, headers).encode()
def send(self, destination, body, content_type=None):
"""
Sends a message to a destination in the messaging system.
Transaction header is not supported.
User defined headers are not supported too (against the protocol specification)
:param destination: Destination string
:param body: Data to be added in the frame body
:param content_type: MIME type which describes the format of the body
:returns: STOMP Frame object
"""
# Destination header is required
headers = {HDR_DESTINATION: destination}
if content_type:
headers[HDR_CONTENT_TYPE] = content_type
body = encode(body)
if HDR_CONTENT_LENGTH not in headers and hasbyte(0, body):
headers[HDR_CONTENT_LENGTH] = len(body)
return Frame(CMD_SEND, headers, body).encode()

View File

@ -15,28 +15,29 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
import tornado.web
from gns3server.plugins import IPlugin
"""
Utilitary functions for STOMP implementation
"""
logger = logging.getLogger(__name__)
import sys
PY2 = sys.version_info[0] == 2
if not PY2:
def encode(char_data):
if type(char_data) is str:
return char_data.encode()
elif type(char_data) is bytes:
return char_data
else:
raise TypeError('message should be a string or bytes')
else:
def encode(char_data):
if type(char_data) is unicode:
return char_data.encode('utf-8')
else:
return char_data
class TestHandler(tornado.web.RequestHandler):
def get(self):
self.write("This is my test handler")
class Dynamips(IPlugin):
def __init__(self):
IPlugin.__init__(self)
logger.info("Dynamips plugin is initializing")
def handlers(self):
"""Returns tornado web request handlers that the plugin manages
:returns: List of tornado.web.RequestHandler
"""
return [(r"/test", TestHandler)]
def hasbyte(byte, byte_data):
return bytes([byte]) in byte_data

View File

@ -0,0 +1,230 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2013 GNS3 Technologies Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
STOMP protocol over Websockets
"""
import zmq
import uuid
import tornado.websocket
from .version import __version__
from tornado.escape import json_decode
from .stomp import frame as stomp_frame
from .stomp import protocol as stomp_protocol
import logging
log = logging.getLogger(__name__)
class StompWebSocket(tornado.websocket.WebSocketHandler):
"""
STOMP protocol over Tornado Websockets with message
routing to ZeroMQ dealer clients.
:param application: Tornado Application object
:param request: Tornado Request object
:param zmq_router: ZeroMQ router socket
"""
clients = set()
destinations = {}
stomp = stomp_protocol.serverProtocol()
def __init__(self, application, request, zmq_router):
tornado.websocket.WebSocketHandler.__init__(self, application, request)
self._session_id = str(uuid.uuid4())
self.zmq_router = zmq_router
@property
def session_id(self):
"""
Session ID uniquely representing a Websocket client
"""
return self._session_id
@classmethod
def dispatch_message(cls, message):
"""
Sends a message to Websocket client
:param message: message from a module
"""
# Module name that is replying
module = message[0].decode("utf-8")
# ZMQ requests are encoded in JSON
# format is a JSON array: [session ID, destination, JSON dict]
json_message = json_decode(message[1])
session_id = json_message[0]
destination = json_message[1]
content = json_message[2]
log.debug("Received message from module {}: {}".format(module,
json_message))
stomp_msg = cls.stomp.message(destination,
content,
"application/json")
for client in cls.clients:
if client.session_id == session_id:
client.write_message(stomp_msg)
@classmethod
def register_destination(cls, destination, module):
"""
Registers a destination handled by a module.
Used to route requests to the right module.
:param destination: destination string
:param module: module string
"""
# Make sure the destination is not already registered
# by another module for instance
assert destination not in cls.destinations
log.info("registering {} as a destination for {}".format(destination,
module))
cls.destinations[destination] = module
def stomp_handle_connect(self, frame):
"""
Handles a STOMP CONNECT frame and returns a STOMP CONNECTED frame.
:param frame: received STOMP CONNECT frame (object)
"""
if not stomp_protocol.HDR_ACCEPT_VERSION in frame.headers or \
not str(self.stomp.version) in frame.headers[stomp_protocol.HDR_ACCEPT_VERSION]:
self.stomp_error("STOMP version error",
"Supported protocol version is {}".format(self.stomp.version),)
else:
self.write_message(self.stomp.connected(self.session_id,
'gns3server/' + __version__))
def stomp_handle_send(self, frame):
"""
Handles a STOMP SEND frame and dispatches it to the right module
based on the destination.
:param frame: received STOMP SEND frame (object)
"""
if stomp_protocol.HDR_DESTINATION not in frame.headers:
self.stomp_error("No destination header in SEND frame")
return
destination = frame.headers[stomp_protocol.HDR_DESTINATION]
if not destination:
self.stomp_error("Destination header is empty in SEND frame")
return
if destination not in self.destinations:
self.stomp_error("Destination {} doesn't exist".format(destination))
return
if not frame.body:
self.stomp_error("SEND frame has no body")
return
module = self.destinations[destination]
# ZMQ requests are encoded in JSON
# format is a JSON array: [session ID, destination, JSON dict]
zmq_request = [self.session_id, destination, frame.body]
# Route to the correct module
self.zmq_router.send_string(module, zmq.SNDMORE)
# Send the encoded JSON request
self.zmq_router.send_json(zmq_request)
def stomp_handle_disconnect(self, frame):
"""
Sends an STOMP RECEIPT frame back to the client when receiving a disconnection
request and close the connection.
:param frame: received STOMP DISCONNECT frame (object)
"""
if stomp_protocol.HDR_RECEIPT not in frame.headers:
self.stomp_error("No receipt header in DISCONNECT frame")
return
receipt = self.stomp.receipt(frame.headers[stomp_protocol.HDR_RECEIPT])
self.write_message(receipt)
self.close()
log.info("Websocket client {} gracefully disconnected".format(self.session_id))
self.clients.remove(self)
def stomp_error(self, short_description='', detailed_info='', content_type="text/plain"):
"""
Sends an STOMP error message back to the client and close the connection.
:param short_description: short description of the error
:param detailed_info: detailed description of the error
:param content_type: MIME type which describes the format of the detailed info
"""
error = self.stomp.error(short_description, detailed_info, content_type)
self.write_message(error)
self.close()
log.warning("Websocket client {} disconnected on an error: {}".format(self.session_id,
short_description))
self.clients.remove(self)
def open(self):
"""
Invoked when a new WebSocket is opened.
"""
log.info("Websocket client {} connected".format(self.session_id))
self.clients.add(self)
def on_message(self, message):
"""
Handles incoming messages.
:param message: message received over the Websocket
"""
log.debug("Received Websocket message: {}".format(message))
try:
frame = stomp_frame.Frame.parse_frame(message)
except Exception:
self.stomp_error("Malformed STOMP frame")
return
if frame.cmd == stomp_protocol.CMD_STOMP or frame.cmd == stomp_protocol.CMD_CONNECT:
self.stomp_handle_connect(frame)
elif frame.cmd == stomp_protocol.CMD_SEND:
self.stomp_handle_send(frame)
elif frame.cmd == stomp_protocol.CMD_DISCONNECT:
self.stomp_handle_disconnect(frame)
else:
self.stomp_error("STOMP frame not implemented")
def on_close(self):
"""
Invoked when the WebSocket is closed.
"""
log.info("Websocket client {} disconnected".format(self.session_id))
self.clients.remove(self)

View File

@ -15,16 +15,13 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# __version__ is a human-readable version number.
class IPlugin(object):
"""Plugin interface
"""
# __version_info__ is a four-tuple for programmatic comparison. The first
# three numbers are the components of the version number. The fourth
# is zero for an official release, positive for a development branch,
# or negative for a release candidate or beta (after the base version
# number has been incremented)
def __init__(self):
pass
def setup(self):
"""Called before the plugin is asked to do anything
"""
raise NotImplementedError()
__version__ = "0.1.dev"
__version_info__ = (0, 1, 0, -99)

View File

@ -1,3 +1,2 @@
tornado
jsonschema
networkx
pyzmq

View File

@ -42,10 +42,11 @@ setup(
cmdclass={"test": Tox},
author="Jeremy Grossmann",
author_email="package-maintainer@gns3.net",
description="GNS3 server with HTTP REST API to manage emulators",
description="GNS3 server to asynchronously to manage emulators",
long_description=open("README.rst", "r").read(),
install_requires=[
"tornado >= 2.0",
"pyzmq",
],
entry_points={
"console_scripts": [

16
tests/conftest.py Normal file
View File

@ -0,0 +1,16 @@
import sys
import os
import pytest
import subprocess
import time
@pytest.fixture(scope="session", autouse=True)
def server(request):
cwd = os.path.dirname(os.path.abspath(__file__))
server_script = os.path.join(cwd, "../gns3server/main.py")
process = subprocess.Popen([sys.executable, server_script, "--port=8000"])
time.sleep(0.1) # give some time for the process to start
request.addfinalizer(process.kill)
return process

95
tests/test_stomp.py Normal file
View File

@ -0,0 +1,95 @@
import uuid
from tornado.testing import AsyncTestCase
from tornado.escape import json_encode, json_decode
from ws4py.client.tornadoclient import TornadoWebSocketClient
from gns3server.stomp import frame as stomp_frame
from gns3server.stomp import protocol as stomp_protocol
class Stomp(AsyncTestCase):
URL = "ws://127.0.0.1:8000/"
def setUp(self):
self.stomp = stomp_protocol.clientProtocol()
AsyncTestCase.setUp(self)
def test_connect(self):
request = self.stomp.connect("localhost")
AsyncWSRequest(self.URL, self.io_loop, self.stop, request)
response = self.wait()
assert response
frame = stomp_frame.Frame.parse_frame(response.decode("utf-8"))
assert frame.cmd == stomp_protocol.CMD_CONNECTED
def test_protocol_negotiation_failure(self):
request = self.stomp.connect("localhost", accept_version='1.0')
AsyncWSRequest(self.URL, self.io_loop, self.stop, request)
response = self.wait()
assert response
frame = stomp_frame.Frame.parse_frame(response.decode("utf-8"))
assert frame.cmd == stomp_protocol.CMD_ERROR
def test_malformed_frame(self):
request = b""
AsyncWSRequest(self.URL, self.io_loop, self.stop, request)
response = self.wait()
assert response
frame = stomp_frame.Frame.parse_frame(response.decode("utf-8"))
assert frame.cmd == stomp_protocol.CMD_ERROR
def test_send(self):
destination = "dynamips/echo"
message = {"ping": "test"}
request = self.stomp.send(destination, json_encode(message), "application/json")
AsyncWSRequest(self.URL, self.io_loop, self.stop, request)
response = self.wait()
assert response
frame = stomp_frame.Frame.parse_frame(response.decode("utf-8"))
assert frame.cmd == stomp_protocol.CMD_MESSAGE
assert frame.headers[stomp_protocol.HDR_DESTINATION] == destination
json_reply = json_decode(frame.body)
assert message == json_reply
def test_unimplemented_frame(self):
frame = stomp_frame.Frame(stomp_protocol.CMD_BEGIN)
request = frame.encode()
AsyncWSRequest(self.URL, self.io_loop, self.stop, request)
response = self.wait()
assert response
frame = stomp_frame.Frame.parse_frame(response.decode("utf-8"))
assert frame.cmd == stomp_protocol.CMD_ERROR
def test_disconnect(self):
myid = str(uuid.uuid4())
request = self.stomp.disconnect(myid)
AsyncWSRequest(self.URL, self.io_loop, self.stop, request)
response = self.wait()
assert response
frame = stomp_frame.Frame.parse_frame(response.decode("utf-8"))
assert frame.cmd == stomp_protocol.CMD_RECEIPT
assert frame.headers[stomp_protocol.HDR_RECEIPT_ID] == myid
class AsyncWSRequest(TornadoWebSocketClient):
def __init__(self, url, io_loop, callback, message):
TornadoWebSocketClient.__init__(self, url, io_loop=io_loop)
self._callback = callback
self._message = message
self.connect()
def opened(self):
self.send(self._message, binary=False)
def received_message(self, message):
self.close()
if self._callback:
self._callback(message.data)

View File

@ -1,8 +1,7 @@
[tox]
envlist = py27, pypy, py33
envlist = py27, py33
[testenv]
commands = py.test [] -s tests
deps =
pytest
tornado
deps = -rdev-requirements.txt