Use a classic thread for UDP server discovery. Closes #545.

This commit is contained in:
grossmj 2016-06-16 11:17:12 -06:00
parent 56051b1142
commit 32907ccac3

View File

@ -26,7 +26,7 @@ import socket
import json
import ipaddress
import asyncio
import select
import threading
import aiohttp
import aiohttp_cors
import functools
@ -59,7 +59,6 @@ class WebServer:
self._handler = None
self._start_time = time.time()
self._port_manager = PortManager(host)
self._running = False
@staticmethod
def instance(host=None, port=None):
@ -195,34 +194,45 @@ class WebServer:
('ipi_addr', in_addr)]
IP_PKTINFO = 8
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
membership = socket.inet_aton("239.42.42.1") + socket.inet_aton("0.0.0.0")
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, membership)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.setsockopt(socket.SOL_IP, IP_PKTINFO, 1)
sock.bind(("0.0.0.0", self._port))
log.info("UDP server discovery started on {}:{}".format("0.0.0.0", self._port))
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
membership = socket.inet_aton("239.42.42.1") + socket.inet_aton("0.0.0.0")
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, membership)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.setsockopt(socket.SOL_IP, IP_PKTINFO, 1)
try:
sock.bind(("", self._port))
except OSError as e:
log.error("UDP server discovery could not bind on port {}: {}".format(self._port, e))
return
while self._running:
ready_to_read, _, _ = select.select([sock], [], [], 1.0)
if ready_to_read:
data, ancdata, _, address = sock.recvmsg(255, socket.CMSG_LEN(255))
log.info("UDP server discovery started on port {}".format(self._port))
while self._loop.is_running():
try:
data, ancdata, _, address = sock.recvmsg(255, socket.CMSG_LEN(255))
except OSError as e:
log.error("Error while receiving UDP server discovery request: {}".format(e))
continue
cmsg_level, cmsg_type, cmsg_data = ancdata[0]
if cmsg_level == socket.SOL_IP and cmsg_type == IP_PKTINFO:
pktinfo = in_pktinfo.from_buffer_copy(cmsg_data)
request_address = ipaddress.IPv4Address(memoryview(pktinfo.ipi_addr).tobytes())
log.debug("UDP server discovery request received on {} using {}".format(socket.if_indextoname(pktinfo.ipi_ifindex),
receiving_interface = socket.if_indextoname(pktinfo.ipi_ifindex)
log.debug("UDP server discovery request received on {} using {}".format(receiving_interface,
request_address))
local_address = ipaddress.IPv4Address(memoryview(pktinfo.ipi_spec_dst).tobytes())
if self._host != "0.0.0.0" and self._host != str(local_address):
log.debug("Ignoring UDP discovery request received on {} instead of {}".format(local_address,
self._host))
continue
server_info = {"version": __version__,
"ip": str(local_address),
"port": self._port}
data = json.dumps(server_info)
sock.sendto(data.encode(), address)
log.debug("Sent server info to {}: {}".format(local_address, data))
log.debug("Sent server info to {}:{} {}".format(address[0], address[1], data))
time.sleep(1) # this is to prevent too many request to slow down the server
log.debug("UDP discovery stopped")
log.debug("UDP server discovery stopped")
def run(self):
"""
@ -282,7 +292,6 @@ class WebServer:
self._handler = app.make_handler(handler=RequestHandler)
server = self._run_application(self._handler, ssl_context)
self._loop.run_until_complete(server)
self._running = True
self._signal_handling()
self._exit_handling()
@ -290,8 +299,9 @@ class WebServer:
asyncio.async(self.start_shell())
if sys.platform.startswith("linux"):
# UDP discovery is only supported on
self._loop.run_in_executor(None, self._udp_server_discovery)
# UDP discovery is only supported on Linux
udp_server_discovery = threading.Thread(target=self._udp_server_discovery, daemon=True)
udp_server_discovery.start()
try:
self._loop.run_forever()
@ -301,7 +311,7 @@ class WebServer:
# TypeError: async() takes 1 positional argument but 3 were given
log.warning("TypeError exception in the loop {}".format(e))
finally:
self._running = False
if self._handler and self._loop.is_running():
self._loop.run_until_complete(self._handler.finish_connections())
server.close()