#!/usr/bin/env python3 import sys oldexcepthook = sys.excepthook def newexcepthook(type,value,traceback): oldexcepthook(type,value,traceback) #input("Press ENTER to quit.") sys.excepthook = newexcepthook import os p = os.path.join pUp = os.path.dirname s = False if getattr(sys, 'frozen', False) and hasattr(sys, '_MEIPASS'): s = os.path.realpath(sys.executable) else: s = os.path.realpath(__file__) sp = pUp(s) # script start import threading import queue import socket import subprocess import configparser outThreads = {} inThreads = {} connections = {} threadId = 0 threadsLock = threading.Lock() fileLock = threading.Lock() connectionsLock = threading.Lock() serverAddr = ("127.0.0.1",12000) serverSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) bufferSize = 1000 # Buffer size in bytes maxClients = 100 # How many clients can be connected at maximum? maxClientsPerIP = 5 # How many clients can be connected at maximum, per IP? maxAccumulatedData = 20*1000*1000 # How much data can be in an outbound thread's queue at maximum before the connection is closed? def commandToList(cmd): args = [] cArg = "" escape = False quoted = False for letter in cmd: if escape == True: cArg += letter escape = False continue if letter == "\\": escape = True continue #if quoted == False and letter == ",": if letter == ",": if cArg == "": continue args.append(cArg) cArg = "" continue #if letter == '"': # quoted = not quoted # continue cArg += letter args.append(cArg) return args class outThread(threading.Thread): def __init__(self,threadId,user): threading.Thread.__init__(self) self.threadId = threadId self.queue = queue.Queue() self.user = user self.ignore = False def closeConnection(self): with connectionsLock: if str(self.threadId) in connections: try: connections[str(self.threadId)][0].close() except: print("warning, closing connection failed") del connections[str(self.threadId)] def getConnection(self): with connectionsLock: if str(self.threadId) in connections: return connections[str(self.threadId)] return False def closeThread(self): with threadsLock: self.closeConnection() del outThreads[str(self.threadId)] def run(self): try: while True: data = self.queue.get() if type(data) == tuple: data[0](*data[1],**data[2]) if data[0] == self.closeThread: return continue self.getConnection()[0].sendall(data) self.closeThread() except: self.closeThread() raise class inThread(threading.Thread): def __init__(self,threadId): threading.Thread.__init__(self) self.threadId = threadId def closeConnection(self): with connectionsLock: if str(self.threadId) in connections: try: connections[str(self.threadId)][0].close() except: print("warning, closing connection failed") del connections[str(self.threadId)] def getConnection(self): with connectionsLock: if str(self.threadId) in connections: return connections[str(self.threadId)] def closeThread(self,closeConnection = True): with threadsLock: if closeConnection: for thread in outThreads: thread = outThreads[thread] if thread.user == self.user: thread.queue.put((thread.closeThread,[],{})) self.closeConnection() del inThreads[str(self.threadId)] def run(self): try: global threadId data = self.getConnection()[0].recv(1000) if data == b"": self.closeThread() return data = data.decode("utf-8") while data[-1] == " ": data = data[:-1] args = commandToList(data) cmd = args.pop(0) user = args[0] self.user = user password = False if len(args) > 1: password = args[1] userPath = p(sp,"users",user+ ".ini") with fileLock: if not os.path.isfile(userPath): self.closeThread() return config = configparser.ConfigParser() config.read(userPath) if cmd == "watch": if cmd in config and "pass" in config[cmd] and config[cmd]["pass"] != "" and config[cmd]["pass"] != password: self.closeThread() return with threadsLock: thread = outThread(self.threadId,self.user) outThreads[str(self.threadId)] = thread thread.start() self.closeThread(False) return if cmd == "broadcast": if cmd in config and "pass" in config[cmd] and config[cmd]["pass"] != "" and config[cmd]["pass"] != password: self.closeThread() return while True: data = self.getConnection()[0].recv(bufferSize) if data == b"": self.closeThread() return with threadsLock: for thread in outThreads: thread = outThreads[thread] if thread.user == user: accumulatedData = thread.queue.qsize() * bufferSize if accumulatedData > maxAccumulatedData and not thread.ignore: thread.ignore = True thread.queue.put((thread.closeThread,[],{})) thread.connection.close() else: thread.queue.put(data) self.closeThread() return except: self.closeThread() raise class debugThread(threading.Thread): def __init__(self): threading.Thread.__init__(self) def run(self): while True: with threadsLock: print("\n---\n") print("Threads - IN: " +str(len(inThreads))) print("Threads - OUT: " +str(len(outThreads))) print("\nACCUMULATED DATA:") for threadId in outThreads: thread = outThreads[threadId] print(threadId + ": " + str(thread.queue.qsize() * bufferSize)) print("\nCONNECTIONS:") connCount = 0 connCountIp = {} with connectionsLock: for connId in connections: conn = connections[connId] ip = conn[1][0] if not ip in connCountIp: connCountIp[ip] = 0 connCountIp[ip] += 1 connCount += 1 for ip in connCountIp: print(ip+ ": " +str(connCountIp[ip])) print("Overall: " +str(connCount)) time.sleep(1) def main(): global threadId serverSocket.bind(serverAddr) serverSocket.listen(1024) # DEBUG global time import time debug = debugThread() debug.start() # DEBUG END while True: connection, address = serverSocket.accept() connection.settimeout(15) with connectionsLock: clientCount = 0 ipClientCount = 0 for connId in connections: clientCount += 1 conn = connections[connId] if conn[1][0] == address[0]: ipClientCount += 1 if clientCount + 1 > maxClients or ipClientCount + 1 > maxClientsPerIP: connection.close() continue with threadsLock: threadId += 1 with connectionsLock: connections[str(threadId)] = (connection,address) thread = inThread(threadId) inThreads[str(threadId)] = thread thread.start() if __name__ == '__main__': main()