Proper server shutdown.

This commit is contained in:
grossmj 2014-04-26 17:51:47 -06:00
parent 223407c596
commit 28ca12367a
5 changed files with 70 additions and 27 deletions

View File

@ -126,6 +126,10 @@ class JSONRPCWebSocket(tornado.websocket.WebSocketHandler):
""" """
log.debug("Received Websocket message: {}".format(message)) log.debug("Received Websocket message: {}".format(message))
if self.zmq_router.closed:
# no need to proceed, the ZeroMQ router has been closed.
return
try: try:
request = json_decode(message) request = json_decode(message)
@ -170,7 +174,7 @@ class JSONRPCWebSocket(tornado.websocket.WebSocketHandler):
# Reset the modules if there are no clients anymore # Reset the modules if there are no clients anymore
# Modules must implement a reset destination # Modules must implement a reset destination
if not self.clients: if not self.clients and not self.zmq_router.closed:
for destination, module in self.destinations.items(): for destination, module in self.destinations.items():
if destination.endswith("reset"): if destination.endswith("reset"):
# Route to the correct module # Route to the correct module

View File

@ -49,6 +49,7 @@ class IModule(multiprocessing.Process):
self._context = None self._context = None
self._ioloop = None self._ioloop = None
self._stream = None self._stream = None
self._dealer = None
self._zmq_host = args[0] # ZeroMQ server address self._zmq_host = args[0] # ZeroMQ server address
self._zmq_port = args[1] # ZeroMQ server port self._zmq_port = args[1] # ZeroMQ server port
self._current_session = None self._current_session = None
@ -72,24 +73,24 @@ class IModule(multiprocessing.Process):
:returns: ZMQ stream instance :returns: ZMQ stream instance
""" """
socket = self._context.socket(zmq.DEALER) self._dealer = self._context.socket(zmq.DEALER)
socket.setsockopt(zmq.IDENTITY, self.name.encode("utf-8")) self._dealer.setsockopt(zmq.IDENTITY, self.name.encode("utf-8"))
if host and port: if host and port:
log.info("ZeroMQ client ({}) connecting to {}:{}".format(self.name, host, port)) log.info("ZeroMQ client ({}) connecting to {}:{}".format(self.name, host, port))
try: try:
socket.connect("tcp://{}:{}".format(host, port)) self._dealer.connect("tcp://{}:{}".format(host, port))
except zmq.error.ZMQError as e: except zmq.error.ZMQError as e:
log.critical("Could not connect to ZeroMQ server on {}:{}, reason: {}".format(host, port, e)) log.critical("Could not connect to ZeroMQ server on {}:{}, reason: {}".format(host, port, e))
raise SystemExit raise SystemExit
else: else:
log.info("ZeroMQ client ({}) connecting to ipc:///tmp/gns3.ipc".format(self.name)) log.info("ZeroMQ client ({}) connecting to ipc:///tmp/gns3.ipc".format(self.name))
try: try:
socket.connect("ipc:///tmp/gns3.ipc") self._dealer.connect("ipc:///tmp/gns3.ipc")
except zmq.error.ZMQError as e: except zmq.error.ZMQError as e:
log.critical("Could not connect to ZeroMQ server on ipc:///tmp/gns3.ipc, reason: {}".format(e)) log.critical("Could not connect to ZeroMQ server on ipc:///tmp/gns3.ipc, reason: {}".format(e))
raise SystemExit raise SystemExit
stream = zmq.eventloop.zmqstream.ZMQStream(socket, self._ioloop) stream = zmq.eventloop.zmqstream.ZMQStream(self._dealer, self._ioloop)
if callback: if callback:
stream.on_recv(callback) stream.on_recv(callback)
return stream return stream
@ -112,7 +113,7 @@ class IModule(multiprocessing.Process):
def signal_handler(signum=None, frame=None): def signal_handler(signum=None, frame=None):
log.warning("Module {} got signal {}, exiting...".format(self.name, signum)) log.warning("Module {} got signal {}, exiting...".format(self.name, signum))
self.stop() self.stop(signum)
signals = [signal.SIGTERM, signal.SIGINT] signals = [signal.SIGTERM, signal.SIGINT]
if not sys.platform.startswith("win"): if not sys.platform.startswith("win"):
@ -131,14 +132,34 @@ class IModule(multiprocessing.Process):
log.info("{} module has stopped".format(self.name)) log.info("{} module has stopped".format(self.name))
def stop(self): def _shutdown(self):
""" """
Stops the event loop. Shutdowns the I/O loop and the ZeroMQ stream & socket
"""
self._ioloop.stop()
if self._stream and not self._stream.closed:
# close the zeroMQ stream
self._stream.close()
if self._dealer and not self._dealer.closed:
# close the ZeroMQ dealer socket
self._dealer.close()
def stop(self, signum=None):
"""
Adds a callback to stop the event loop & ZeroMQ.
:param signum: signal number (if called by the signal handler)
""" """
if not self._stopping: if not self._stopping:
self._stopping = True self._stopping = True
self._ioloop.add_callback_from_signal(self._ioloop.stop) if signum:
self._ioloop.add_callback_from_signal(self._shutdown)
else:
self._shutdown()
def send_response(self, results): def send_response(self, results):
""" """

View File

@ -122,16 +122,19 @@ class Dynamips(IModule):
self._callback = self.add_periodic_callback(self._check_hypervisors, 5000) self._callback = self.add_periodic_callback(self._check_hypervisors, 5000)
self._callback.start() self._callback.start()
def stop(self): def stop(self, signum=None):
""" """
Properly stops the module. Properly stops the module.
:param signum: signal number (if called by the signal handler)
""" """
if not sys.platform.startswith("win32"): if not sys.platform.startswith("win32"):
self._callback.stop() self._callback.stop()
if self._hypervisor_manager: if self._hypervisor_manager:
self._hypervisor_manager.stop_all_hypervisors() self._hypervisor_manager.stop_all_hypervisors()
IModule.stop(self) # this will stop the I/O loop
IModule.stop(self, signum) # this will stop the I/O loop
def _check_hypervisors(self): def _check_hypervisors(self):
""" """

View File

@ -104,9 +104,11 @@ class IOU(IModule):
self._iou_callback = self.add_periodic_callback(self._check_iou_is_alive, 5000) self._iou_callback = self.add_periodic_callback(self._check_iou_is_alive, 5000)
self._iou_callback.start() self._iou_callback.start()
def stop(self): def stop(self, signum=None):
""" """
Properly stops the module. Properly stops the module.
:param signum: signal number (if called by the signal handler)
""" """
self._iou_callback.stop() self._iou_callback.stop()
@ -115,7 +117,7 @@ class IOU(IModule):
iou_instance = self._iou_instances[iou_id] iou_instance = self._iou_instances[iou_id]
iou_instance.delete() iou_instance.delete()
IModule.stop(self) # this will stop the I/O loop IModule.stop(self, signum) # this will stop the I/O loop
def _check_iou_is_alive(self): def _check_iou_is_alive(self):
""" """

View File

@ -55,6 +55,9 @@ class Server(object):
self._host = host self._host = host
self._port = port self._port = port
self._router = None
self._stream = None
if ipc: if ipc:
self._zmq_port = 0 # this forces to use IPC for communications with the ZeroMQ server self._zmq_port = 0 # this forces to use IPC for communications with the ZeroMQ server
else: else:
@ -156,13 +159,13 @@ class Server(object):
self._cleanup() self._cleanup()
ioloop = tornado.ioloop.IOLoop.instance() ioloop = tornado.ioloop.IOLoop.instance()
stream = zmqstream.ZMQStream(router, ioloop) self._stream = zmqstream.ZMQStream(router, ioloop)
stream.on_recv_stream(JSONRPCWebSocket.dispatch_message) self._stream.on_recv_stream(JSONRPCWebSocket.dispatch_message)
tornado.autoreload.add_reload_hook(functools.partial(self._cleanup, stop=False)) tornado.autoreload.add_reload_hook(functools.partial(self._cleanup, stop=False))
def signal_handler(signum=None, frame=None): def signal_handler(signum=None, frame=None):
log.warning("Server got signal {}, exiting...".format(signum)) log.warning("Server got signal {}, exiting...".format(signum))
self._cleanup() self._cleanup(signum)
signals = [signal.SIGTERM, signal.SIGINT] signals = [signal.SIGTERM, signal.SIGINT]
if not sys.platform.startswith("win"): if not sys.platform.startswith("win"):
@ -188,10 +191,10 @@ class Server(object):
context = zmq.Context() context = zmq.Context()
context.linger = 0 context.linger = 0
router = context.socket(zmq.ROUTER) self._router = context.socket(zmq.ROUTER)
if self._ipc: if self._ipc:
try: try:
router.bind("ipc:///tmp/gns3.ipc") self._router.bind("ipc:///tmp/gns3.ipc")
except zmq.error.ZMQError as e: except zmq.error.ZMQError as e:
log.critical("Could not start ZeroMQ server on ipc:///tmp/gns3.ipc, reason: {}".format(e)) log.critical("Could not start ZeroMQ server on ipc:///tmp/gns3.ipc, reason: {}".format(e))
self._cleanup() self._cleanup()
@ -199,34 +202,41 @@ class Server(object):
log.info("ZeroMQ server listening to ipc:///tmp/gns3.ipc") log.info("ZeroMQ server listening to ipc:///tmp/gns3.ipc")
else: else:
try: try:
router.bind("tcp://127.0.0.1:{}".format(self._zmq_port)) self._router.bind("tcp://127.0.0.1:{}".format(self._zmq_port))
except zmq.error.ZMQError as e: except zmq.error.ZMQError as e:
log.critical("Could not start ZeroMQ server on 127.0.0.1:{}, reason: {}".format(self._zmq_port, e)) log.critical("Could not start ZeroMQ server on 127.0.0.1:{}, reason: {}".format(self._zmq_port, e))
self._cleanup() self._cleanup()
raise SystemExit raise SystemExit
log.info("ZeroMQ server listening to 127.0.0.1:{}".format(self._zmq_port)) log.info("ZeroMQ server listening to 127.0.0.1:{}".format(self._zmq_port))
return router return self._router
def _shutdown(self): def _shutdown(self):
""" """
Shutdowns the I/O loop. Shutdowns the I/O loop and the ZeroMQ stream & socket.
""" """
if self._stream and not self._stream.closed:
# close the ZeroMQ stream
self._stream.close()
if self._router and not self._router.closed:
# close the ZeroMQ router socket
self._router.close()
ioloop = tornado.ioloop.IOLoop.instance() ioloop = tornado.ioloop.IOLoop.instance()
ioloop.stop() ioloop.stop()
def _cleanup(self, stop=True): def _cleanup(self, signum=None, stop=True):
""" """
Shutdowns any running module processes Shutdowns any running module processes
and close remaining Tornado ioloop file descriptors and adds a callback to stop the event loop & ZeroMQ
:param signum: signal number (if called by the signal handler)
:param stop: stops the ioloop if True (default) :param stop: stops the ioloop if True (default)
""" """
# terminate all modules # terminate all modules
for module in self._modules: for module in self._modules:
# if not sys.platform.startswith("win"):
# module.join(timeout=0.5)
if module.is_alive(): if module.is_alive():
log.info("terminating {}".format(module.name)) log.info("terminating {}".format(module.name))
module.terminate() module.terminate()
@ -234,4 +244,7 @@ class Server(object):
if stop: if stop:
ioloop = tornado.ioloop.IOLoop.instance() ioloop = tornado.ioloop.IOLoop.instance()
ioloop.add_callback_from_signal(self._shutdown) if signum:
ioloop.add_callback_from_signal(self._shutdown)
else:
self._shutdown()