Forking what is left of ZeroNet and hopefully adding an AT Proto Frontend/Proxy
at main 139 lines 4.9 kB view raw
1import socket 2import logging 3import time 4from contextlib import closing 5 6from Debug import Debug 7from util import UpnpPunch 8from util import Msgpack 9 10 11class BroadcastServer(object): 12 def __init__(self, service_name, listen_port=1544, listen_ip=''): 13 self.log = logging.getLogger("BroadcastServer") 14 self.listen_port = listen_port 15 self.listen_ip = listen_ip 16 17 self.running = False 18 self.sock = None 19 self.sender_info = {"service": service_name} 20 21 def createBroadcastSocket(self): 22 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 23 sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) 24 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 25 if hasattr(socket, 'SO_REUSEPORT'): 26 try: 27 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) 28 except Exception as err: 29 self.log.warning("Error setting SO_REUSEPORT: %s" % err) 30 31 binded = False 32 for retry in range(3): 33 try: 34 sock.bind((self.listen_ip, self.listen_port)) 35 binded = True 36 break 37 except Exception as err: 38 self.log.error( 39 "Socket bind to %s:%s error: %s, retry #%s" % 40 (self.listen_ip, self.listen_port, Debug.formatException(err), retry) 41 ) 42 time.sleep(retry) 43 44 if binded: 45 return sock 46 else: 47 return False 48 49 def start(self): # Listens for discover requests 50 self.sock = self.createBroadcastSocket() 51 if not self.sock: 52 self.log.error("Unable to listen on port %s" % self.listen_port) 53 return 54 55 self.log.debug("Started on port %s" % self.listen_port) 56 57 self.running = True 58 59 while self.running: 60 try: 61 data, addr = self.sock.recvfrom(8192) 62 except Exception as err: 63 if self.running: 64 self.log.error("Listener receive error: %s" % err) 65 continue 66 67 if not self.running: 68 break 69 70 try: 71 message = Msgpack.unpack(data) 72 response_addr, message = self.handleMessage(addr, message) 73 if message: 74 self.send(response_addr, message) 75 except Exception as err: 76 self.log.error("Handlemessage error: %s" % Debug.formatException(err)) 77 self.log.debug("Stopped listening on port %s" % self.listen_port) 78 79 def stop(self): 80 self.log.debug("Stopping, socket: %s" % self.sock) 81 self.running = False 82 if self.sock: 83 self.sock.close() 84 85 def send(self, addr, message): 86 if type(message) is not list: 87 message = [message] 88 89 for message_part in message: 90 message_part["sender"] = self.sender_info 91 92 self.log.debug("Send to %s: %s" % (addr, message_part["cmd"])) 93 with closing(socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) as sock: 94 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 95 sock.sendto(Msgpack.pack(message_part), addr) 96 97 def getMyIps(self): 98 return UpnpPunch._get_local_ips() 99 100 def broadcast(self, message, port=None): 101 if not port: 102 port = self.listen_port 103 104 my_ips = self.getMyIps() 105 addr = ("255.255.255.255", port) 106 107 message["sender"] = self.sender_info 108 self.log.debug("Broadcast using ips %s on port %s: %s" % (my_ips, port, message["cmd"])) 109 110 for my_ip in my_ips: 111 try: 112 with closing(socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) as sock: 113 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 114 sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) 115 sock.bind((my_ip, 0)) 116 sock.sendto(Msgpack.pack(message), addr) 117 except Exception as err: 118 self.log.warning("Error sending broadcast using ip %s: %s" % (my_ip, err)) 119 120 def handleMessage(self, addr, message): 121 self.log.debug("Got from %s: %s" % (addr, message["cmd"])) 122 cmd = message["cmd"] 123 params = message.get("params", {}) 124 sender = message["sender"] 125 sender["ip"] = addr[0] 126 127 func_name = "action" + cmd[0].upper() + cmd[1:] 128 func = getattr(self, func_name, None) 129 130 if sender["service"] != "zeronet" or sender["peer_id"] == self.sender_info["peer_id"]: 131 # Skip messages not for us or sent by us 132 message = None 133 elif func: 134 message = func(sender, params) 135 else: 136 self.log.debug("Unknown cmd: %s" % cmd) 137 message = None 138 139 return (sender["ip"], sender["broadcast_port"]), message