#!/usr/bin/env python3 import sys oldexcepthook = sys.excepthook def newexcepthook(type,value,traceback): oldexcepthook(type,value,traceback) #input("Press ENTER to quit.") sys.excepthook = newexcepthook import os p = os.path.join pUp = os.path.dirname s = False if getattr(sys, 'frozen', False) and hasattr(sys, '_MEIPASS'): s = os.path.realpath(sys.executable) else: s = os.path.realpath(__file__) sp = pUp(s) # script start import threading import queue import socket import subprocess import configparser import time connections = {} connectionsId = 0 connectionsLock = threading.Lock() threadCount = 0 threadCountLock = threading.Lock() fileLock = threading.Lock() serverAddr = ("127.0.0.1",61920) serverSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) bufferSize = 10000 # Buffer size in bytes timeout = 15 # How long to wait for a connection to respond before timing out? maxClients = 20 # How many clients can be connected at maximum? maxClientsPerIP = 3 # How many clients can be connected at maximum, per IP? maxAccumulatedData = 50*1000*1000 # How much data can be in an outbound thread's queue at maximum before the connection is closed? The maximum amount of queue data is maxAccumulatedData * maxClients bytes. maxInboundTransferRate = 0 # Maximum upload speed for broadcasters. This sucks right now. Set to 0 to disable. def addThread(): global threadCount with threadCountLock: threadCount += 1 print("Thread opened. Threads: " +str(threadCount)+ " (Actual: " +str(threading.active_count())+ ")") def removeThread(): global threadCount with threadCountLock: threadCount -= 1 print("Thread closed. Threads: " +str(threadCount)+ " (Actual: " +str(threading.active_count())+ ")") def commandToList(cmd): args = [] cArg = "" escape = False quoted = False for letter in cmd: if escape == True: cArg += letter escape = False continue if letter == "\\": escape = True continue #if quoted == False and letter == ",": if letter == ",": if cArg == "": continue args.append(cArg) cArg = "" continue #if letter == '"': # quoted = not quoted # continue cArg += letter args.append(cArg) return args def closeConnection(connectionId): if connectionId in connections: try: connections[connectionId]["connection"].close() except Exception as e: print("Failed to close connection: " +str(e)) pass del connections[connectionId] class outThread(threading.Thread): def __init__(self,connectionId): threading.Thread.__init__(self) self.queue = queue.Queue() self.connectionId = connectionId def getConnection(self): with connectionsLock: if self.connectionId in connections: return connections[self.connectionId]["connection"] return False def run(self): try: while True: data = self.queue.get(timeout=15) connection = self.getConnection() if not connection: removeThread() return connection.sendall(data) with connectionsLock: closeConnection(self.connectionId) removeThread() return except: with connectionsLock: closeConnection(self.connectionId) removeThread() print("Thread closed - Exception:") raise class inThread(threading.Thread): def __init__(self,connectionId): threading.Thread.__init__(self) self.connectionId = connectionId def getConnection(self): with connectionsLock: if self.connectionId in connections: return connections[self.connectionId]["connection"] return False def run(self): try: connection = self.getConnection() if not connection: removeThread() return data = connection.recv(1000) if data == b"": with connectionsLock: closeConnection(self.connectionId) print("Thread closed - Client disconnected.") removeThread() return data = data.decode("utf-8") while data[-1] == " ": data = data[:-1] args = commandToList(data) cmd = args.pop(0) user = args[0] self.user = user password = False if len(args) > 1: password = args[1] userPath = p(sp,"users",user+ ".ini") with fileLock: if not os.path.isfile(userPath): with connectionsLock: closeConnection(self.connectionId) print("Thread closed - Invalid user given: " +user) removeThread() return config = configparser.ConfigParser() config.read(userPath) if cmd == "watch": if cmd in config and "pass" in config[cmd] and config[cmd]["pass"] != "" and config[cmd]["pass"] != password: with connectionsLock: closeConnection(self.connectionId) print("Thread closed - Invalid password given for user: " +user) removeThread() return with connectionsLock: if not self.connectionId in connections: removeThread() return connections[self.connectionId]["action"] = "watch" connections[self.connectionId]["user"] = user thread = outThread(self.connectionId) connections[self.connectionId]["outThread"] = thread thread.start() addThread() connections[self.connectionId]["inThread"] = False removeThread() return if cmd == "broadcast": if cmd in config and "pass" in config[cmd] and config[cmd]["pass"] != "" and config[cmd]["pass"] != password: with connectionsLock: closeConnection(self.connectionId) print("Thread closed - Invalid password given for user: " +user) removeThread() return with connectionsLock: deleteList = [] for connectionId in connections: if connectionId == self.connectionId: continue if connections[connectionId]["action"] != "broadcast": continue if connections[connectionId]["user"] != user: continue deleteList.append(connectionId) for connectionId in deleteList: closeConnection[connectionId] connections[connectionId]["action"] = "broadcast" connections[connectionId]["user"] = user if maxInboundTransferRate > 0: startTime = time.time() while True: connection = self.getConnection() if not connection: removeThread() return data = connection.recv(bufferSize) if data == b"": with connectionsLock: closeConnection(self.connectionId) print("Thread closed - Client disconnected.") removeThread() return with connectionsLock: for connectionId in connections: connectionData = connections[connectionId] thread = connectionData["outThread"] if not thread: continue if connectionData["user"] != user: continue accumulatedData = thread.queue.qsize() * bufferSize if accumulatedData > maxAccumulatedData: print("Thread closed - Too much accumulated data.") closeConnection(connectionId) removeThread() return thread.queue.put(data) if maxInboundTransferRate > 0: endTime = time.time() timeTaken = endTime - startTime if timeTaken <= 0: timeTaken = 0.00001 # this isn't good enough transferSpeed = bufferSize / timeTaken if transferSpeed > maxInboundTransferRate: maxSleep = (bufferSize/maxInboundTransferRate) - timeTaken if maxSleep > 0: sleepTime = (transferSpeed / maxInboundTransferRate) - 1 if sleepTime > maxSleep: sleepTime = maxSleep time.sleep(sleepTime) startTime = endTime with connectionsLock: closeConnection(self.connectionId) removeThread() return except: with connectionsLock: closeConnection(self.connectionId) removeThread() print("Thread closed - Exception:") raise class debugThread(threading.Thread): def __init__(self): threading.Thread.__init__(self) def run(self): while True: with connectionsLock: print("\n---\n") print("Threads - IN: " +str(len(inThreads))) print("Threads - OUT: " +str(len(outThreads))) print("\nACCUMULATED DATA:") for threadId in outThreads: thread = outThreads[threadId] print(threadId + ": " + str(thread.queue.qsize() * bufferSize)) print("\nCONNECTIONS:") connCount = 0 connCountIp = {} with connectionsLock: for connId in connections: conn = connections[connId] ip = conn[1][0] if not ip in connCountIp: connCountIp[ip] = 0 connCountIp[ip] += 1 connCount += 1 for ip in connCountIp: print(ip+ ": " +str(connCountIp[ip])) print("Overall: " +str(connCount)) time.sleep(1) def readConfig(): config = configparser.ConfigParser() config.read(p(os.path.splitext(s)[0] + ".ini")) global serverAddr serverAddrSplit = config["default"]["serverAddr"].rsplit(":",1) serverAddrSplit[1] = int(serverAddrSplit[1]) serverAddr = tuple(serverAddrSplit) def main(): global connectionsId readConfig() serverSocket.bind(serverAddr) serverSocket.listen(1024) # DEBUG #debug = debugThread() #debug.start() # DEBUG END while True: connection, address = serverSocket.accept() connection.settimeout(timeout) with connectionsLock: clientCount = 0 ipClientCount = 0 for connectionId in connections: clientCount += 1 if connections[connectionId]["address"][0] == address[0]: ipClientCount += 1 if clientCount + 1 > maxClients: print("Connection closed - too many clients.") closeConnection(connectionId) continue if ipClientCount + 1 > maxClientsPerIP: print("Connection closed - same IP connected too many times.") closeConnection(connectionId) continue connectionsId += 1 threadIn = inThread(str(connectionsId)) connections[str(connectionsId)] = { "connection": connection, "address": address, "inThread": threadIn, "outThread": False, "action": False, "user": False } threadIn.start() addThread() if __name__ == '__main__': main()