#!/usr/bin/env python3 import sys oldexcepthook = sys.excepthook def newexcepthook(type,value,traceback): oldexcepthook(type,value,traceback) #input("Press ENTER to quit.") sys.excepthook = newexcepthook import os p = os.path.join pUp = os.path.dirname s = False if getattr(sys, 'frozen', False) and hasattr(sys, '_MEIPASS'): s = os.path.realpath(sys.executable) else: s = os.path.realpath(__file__) sp = pUp(s) import threading import queue import socket import configparser debug = True bufferSize = 8096 # 8 KiB timeout = 15 maxClients = 100 maxClientsPerIP = 10 maxBroadcastsPerIP = 3 maxAccumulatedData = 10485760 # 10 MiB addresses = [ ("127.0.0.1",61920), ("136.243.209.174",61920) ] userPath = p(sp,"users") clients = {} clientsID = 0 clientsLock = threading.Lock() pendingConnections = queue.Queue() def move (y, x): print("\033[%d;%dH" % (y, x)) def listToCommand(lst): cmd = "" for arg in lst: arg = arg.replace("\\","\\\\") arg = arg.replace(",","\\,") cmd += arg + "," return cmd[:-1] def commandToList(cmd): args = [] cArg = "" escape = False for letter in cmd: if escape == True: cArg += letter escape = False continue if letter == "\\": escape = True continue if letter == ",": if cArg == "": continue args.append(cArg) cArg = "" continue cArg += letter args.append(cArg) return args def makePayload(lst): cmdText = listToCommand(lst) cmdBytes = cmdText.encode("utf-8") cmdBytes += b" " * (1024 - len(cmdBytes)) return cmdBytes class socketThread(threading.Thread): def __init__(self,ip): threading.Thread.__init__(self) self.ip = ip self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.bind(self.ip) def run(self): self.socket.listen(32766) while True: connection = False try: connection,address = self.socket.accept() pendingConnections.put((connection,address)) except Exception as e: try: connection.close() except: pass print(e) class clientThread(threading.Thread): def __init__(self,clientID,env): threading.Thread.__init__(self) self.clientID = clientID self.env = env self.queue = queue.Queue() def childRun(self): if self.env["command"] == "watch": self.env["connection"].sendall(makePayload([str(self.env["bsize"])])) while True: data = self.queue.get(timeout=timeout) while self.queue.qsize() > 0: try: data += self.queue.get(False) except Queue.Empty: pass self.env["connection"].sendall(data) if self.env["command"] == "broadcast": self.env["connection"].sendall(makePayload(["dummy"])) while True: data = self.env["connection"].recv(self.env["bsize"]) if data == b"": return with clientsLock: for clientID in clients: try: if clients[clientID]["command"] != "watch": continue if clients[clientID]["client"] != str(self.clientID): continue clients[clientID]["thread"].queue.put(data) except: pass def run(self): try: self.childRun() except Exception as e: try: print(e) except: print("Could not print exception.") try: self.env["connection"].close() except: pass try: with clientsLock: del clients[str(self.clientID)] except: pass def handleConnection(connection,address): global clientsID args = commandToList(connection.recv(1024).decode("utf-8").rstrip(" ")) command = args[0] if command == "watch": user = args[1] room = args[2] passw = args[3] watchID = False with clientsLock: for clientID in clients: if clients[clientID]["command"] != "broadcast": continue if clients[clientID]["user"] != user: continue if clients[clientID]["room"] != room: continue if clients[clientID]["passw"] != passw: continue watchID = clientID break if watchID == False: connection.close() return env = { "command": "watch", "client": watchID, "connection": connection, "address": address, "bsize": clients[watchID]["bsize"] } clientsID += 1 thr = clientThread(clientsID,env.copy()) env["thread"] = thr clients[str(clientsID)] = env thr.start() return if command == "broadcast": bsize = int(args[1]) user = args[2] userPassw = args[3] room = args[4] roomPassw = args[5] config = configparser.ConfigParser() config.read(p(userPath,user+ ".ini")) if config["DEFAULT"]["password"] != userPassw: connection.close() return env = { "command": "broadcast", "user": user, "room": room, "passw": roomPassw, "bsize": bsize, "connection": connection, "address": address } with clientsLock: clientsID += 1 thr = clientThread(clientsID,env.copy()) env["thread"] = thr clients[str(clientsID)] = env thr.start() return connection.close() class debugThread(threading.Thread): def __init__(self): threading.Thread.__init__(self) global time import time def run(self): while True: clientCount = 0 accumDataCount = 0 threadCount = threading.active_count() with clientsLock: clientCount = len(clients) for clientID in clients: accumDataCount += (clients[clientID]["thread"].queue.qsize() * clients[clientID]["bsize"]) / 1000000 #move(0,0) print(" --- ") print(" Threads: " +str(threadCount)+ " ") print(" Clients: " +str(clientCount)+ " ") print(" Accumulated Data: " +str(accumDataCount)+ "MB ") time.sleep(1) def main(): for address in addresses: print("Opening TCP socket " +address[0]+ ":" +str(address[1])+ "...") thr = socketThread(address) thr.start() print("Serving!") if debug == True: thr = debugThread() thr.start() while True: connection,address = pendingConnections.get() try: handleConnection(connection,address) except Exception as e: try: connection.close() except: pass try: print(e) except: print("Could not print exception.") if __name__ == '__main__': main()