diff --git a/clientBlaster.py b/clientBlaster.py index a87bae0..bb6528b 100644 --- a/clientBlaster.py +++ b/clientBlaster.py @@ -19,237 +19,196 @@ sp = pUp(s) # script start import threading +import queue import socket -import struct +import traceback import time +import colorama +colorama.init() -addr = ("127.0.0.1",21779) +maxConnections = 10000 +maxConnectionsPerIp = 10 +maxQueueSize = 1000 +maxRequestSize = 4096 +pauseBetweenCommands = 0.1 -threads = {} -threadId = 0 -threadsLock = threading.Lock() -close = False +serverAddr = ("127.0.0.1",21779) +serverSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) -eventHandlers = {} -eventHandlersLock = threading.Lock() +connectionsLock = threading.Lock() +connections = {} +connectionsId = 0 -def runCode(str, lcs = False, description = "loose-code"): - if lcs == False: lcs = {} - code = compile(str,description,"exec") - exec(code,globals(),lcs) - return lcs +heartbeatTime = 600 -def runScript(sf, lcs = False): - if lcs == False: lcs = {} - with open(sf) as script: - runCode(script.read(),lcs,sf) - return lcs +threadCount = 0 +threadCountLock = threading.Lock() -def getModlist(path): - modList = [] - for root,dirs,files in os.walk(path): - for file in dirs: - ffile = p(root,file) - lfile = ffile.replace(path + os.path.sep,"",1) - if lfile[0] == "-": continue - if lfile[0] == "[" and lfile[-1] == "]": - modList = modList + sorted(getModlist(ffile)) - continue - - modList.append(ffile) - break - - return modList +printLock = threading.Lock() +def tprint(st): + with printLock: + print(st) -def triggerEvent(event,*args,**kwargs): - with eventHandlersLock: - handlers = eventHandlers.copy() - - if not event in handlers: return - for func in handlers[event]: - cancel = func(event,*args,**kwargs) - if cancel: return True - - return False +def addThread(): + global threadCount + with threadCountLock: + threadCount += 1 + tprint(colorama.Fore.YELLOW + colorama.Style.BRIGHT + "Thread opened. Threads: " +str(threadCount)+ " (Actual: " +str(threading.active_count())+ ")" + colorama.Style.RESET_ALL) -def addEventHandler(event,func): - with eventHandlersLock: - if not event in eventHandlers: eventHandlers[event] = [] - eventHandlers[event].append(func) +def removeThread(): + global threadCount + with threadCountLock: + threadCount -= 1 + tprint(colorama.Fore.YELLOW + colorama.Style.BRIGHT + "Thread closed. Threads: " +str(threadCount)+ " (Actual: " +str(threading.active_count())+ ")" + colorama.Style.RESET_ALL) def sendResponse(connection,data): connection.sendall(len(data).to_bytes(4,"big") + data) -senderThreadSleepMin = 0.0333 -senderThreadSleepMax = 1.0 -senderThreadSleepIncr = 0.01 +def getResponse(connection): + data = b'' + data = connection.recv(4) + if not data: return False + requestLength = int.from_bytes(data,"big") + if requestLength > maxRequestSize: raise Exception("security","request_too_large") + return connection.recv(requestLength) -class senderThread(threading.Thread): - def __init__(self,connectionThread): +def closeConnection(connectionId): + if not connectionId in connections: return False + try: + connections[connectionId]["connection"].close() + except Exception as e: + tprint("Failed to close connection: " +str(e)) + pass + + try: + connections[connectionId]["threadOut"].queue.put(False) + except: + with printLock: + print(colorama.Fore.GREEN + colorama.Style.BRIGHT) + traceback.print_exc() + print(colorama.Style.RESET_ALL) + + del connections[connectionId] + return True + +class connectionThreadOut(threading.Thread): + def __init__(self,connectionId): threading.Thread.__init__(self) - self.lock = threading.Lock() - with self.lock: - self.connectionThread = connectionThread - self.queue = [] - self.newQueue = False - self.sleep = senderThreadSleepMin + self.queue = queue.Queue() + self.connectionId = connectionId - def closeThread(self): - with self.lock: - self.queue = [["close"]] - self.newQueue = True - - def addToQueue(self,entry): - with self.lock: - self.queue.append(entry) - self.newQueue = True + def getConnection(self): + with connectionsLock: + if self.connectionId in connections: + return connections[self.connectionId]["connection"] + return False def run(self): - sleepTime = 0 - while True: - with self.lock: - sleepTime = self.sleep - - #print(sleepTime) - time.sleep(sleepTime) - - with self.lock: - if not self.newQueue: - if self.sleep < senderThreadSleepMax: - self.sleep += senderThreadSleepIncr * self.sleep - if self.sleep > senderThreadSleepMax: self.sleep = senderThreadSleepMax - continue + try: + while True: + data = self.queue.get(timeout=heartbeatTime) + if data == False: return - for entry in self.queue: - if entry[0] == "close": return - entry[0](*entry[1],**entry[2]) - self.queue = [] - self.newQueue = False - self.sleep = senderThreadSleepMin - -class connectionThread(threading.Thread): - def __init__(self,threadId,connection,address): - threading.Thread.__init__(self) - self.lock = threading.Lock() - with self.lock: - self.threadId = threadId - self.connection = connection - self.address = address - self.closed = False - self.user = False - self.senderThread = senderThread(self) - self.senderThread.start() - - def closeThread(self): - with self.lock, threadsLock: - self.senderThread.closeThread() - try: - self.connection.close() - except: - print("failed to close connection, ignoring.") - pass - - del threads[str(self.threadId)] - print("thread closed: " +str(self.threadId)+ " (open: " +str(len(threads))+ ")") - self.closed = True - - def sendResponse(self,data,lock = True): - if lock == True: - with self.lock: - self.senderThread.addToQueue([sendResponse,[self.connection,data],{}]) - else: - self.senderThread.addToQueue([sendResponse,[self.connection,data],{}]) - - def run(self): - with self.lock: - print("thread opened: " +", ".join((str(self.threadId),str(self.address)))) - - while True: - try: - # get request length - data = b'' - data = self.connection.recv(4) - - if not data: - self.closeThread() + connection = self.getConnection() + if not connection: + with connectionsLock: closeConnection(self.connectionId) return - requestLength = int.from_bytes(data,"big") - - # inform about request - cancel = triggerEvent("onPreRequest",self,requestLength) - with self.lock: - if self.closed: - return - if cancel: continue - - # process request - cancel = triggerEvent("onRequest",self,requestLength) - with self.lock: - if self.closed: - return - if cancel: continue - except Exception as e: - cancel = False - try: - cancel = triggerEvent("onException",self,e) - except: - self.closeThread() - raise - - if cancel: continue - self.closeThread() - raise e + sendResponse(connection,data) + except Exception as e: + with connectionsLock: closeConnection(self.connectionId) + with printLock: + print(colorama.Fore.GREEN + colorama.Style.BRIGHT) + traceback.print_exc() + print(colorama.Style.RESET_ALL) + finally: + removeThread() -modulesLoaded = [] -modulePath = p(sp,"modules") -def moduleRun(localModule): - if not localModule in modulesLoaded: modulesLoaded.append(localModule) - print("> " +localModule+ "...") - runScript(p(modulePath,localModule,"module.py")) - -def moduleDepends(localModules): - if type(localModules) == str: localModules = [localModules] +class connectionThreadIn(threading.Thread): + def __init__(self,connectionId): + threading.Thread.__init__(self) + self.connectionId = connectionId - for localModule in localModules: - if localModule in modulesLoaded: return - print("depend ",end="") - moduleRun(localModule) + def getConnection(self): + with connectionsLock: + if self.connectionId in connections: + return connections[self.connectionId]["connection"] + return False + + def run(self): + try: + while True: + connection = self.getConnection() + if not connection: + with connectionsLock: closeConnection(self.connectionId) + return + + data = getResponse(connection) + if data == False: + with connectionsLock: closeConnection(self.connectionId) + return + + with connectionsLock: + if self.connectionId in connections: + queue = connections[self.connectionId]["threadOut"].queue + if queue.qsize() >= maxQueueSize: + closeConnection(self.connectionId) + return + queue.put(data) + time.sleep(pauseBetweenCommands) + except Exception as e: + with connectionsLock: closeConnection(self.connectionId) + with printLock: + print(colorama.Fore.GREEN + colorama.Style.BRIGHT) + traceback.print_exc() + print(colorama.Style.RESET_ALL) + finally: + removeThread() def main(): - print("Loading modules...") - for path in getModlist(modulePath): - if os.path.isfile(p(path,"module.py")): - localModule = path.replace(modulePath + os.path.sep,"",1) - if not localModule in modulesLoaded: - moduleRun(localModule) + global connectionsId + serverSocket.bind(serverAddr) + serverSocket.listen(65535) - print("\nServing on " +":".join(map(str,addr))+ "!") - - global socketServer - socketServer = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - socketServer.bind(addr) - socketServer.listen(1000) - - global threadId - global close while True: - connection, address = socketServer.accept() + connection,address = serverSocket.accept() + connection.settimeout(heartbeatTime) - # inform about connection - with threadsLock: - if close: break - cancel = triggerEvent("onConnect",connection,address) - if close: break - if cancel: continue + with connectionsLock: + # Count connections + connectionsCount = 0 + connectionsCountIp = 0 + for connectionId in connections: + connectionsCount += 1 + if connections[connectionId]["address"][0] == address[0]: + connectionsCountIp += 1 - threadId += 1 - while str(threadId) in threads: - threadId += 1 + if connectionsCount >= maxConnections: + tprint("Connection closed - too many clients.") + closeConnection(connectionId) + continue - thread = connectionThread(threadId,connection,address) - threads[str(threadId)] = thread - thread.start() + if connectionsCountIp >= maxConnectionsPerIp: + tprint("Connection closed - same IP connected too many times.") + closeConnection(connectionId) + continue + + # Create connection + connectionsId += 1 + threadIn = connectionThreadIn(str(connectionsId)) + threadOut = connectionThreadOut(str(connectionsId)) + connections[str(connectionsId)] = { + "connection": connection, + "address": address, + "threadOut": threadOut, + "threadIn": threadIn, + "user": False + } + + addThread() + addThread() + threadOut.start() + threadIn.start() if __name__ == '__main__': main() \ No newline at end of file diff --git a/serverBlaster.py b/serverBlaster.py index d34b6b2..a893310 100644 --- a/serverBlaster.py +++ b/serverBlaster.py @@ -49,7 +49,7 @@ def getResponse(connection): def main(): global connection connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - connection.connect(("127.0.0.1",21779)) + connection.connect((sys.argv[1],int(sys.argv[2]))) thread = receiverThread(connection) thread.start()