From ac5c705052d8b4e14c0e68021eef381f2a3b10dd Mon Sep 17 00:00:00 2001 From: Fierelier Date: Sun, 29 May 2022 14:31:07 +0200 Subject: [PATCH] Rewrite --- LICENSE | 9 + client-threaded.py | 149 --------- .../fstream-util-pipe_to_tcp.py | 0 .../fstream-util-tcp_to_pipe.py | 0 client.py => client/fstream.py | 257 +++++++++------- fsockets.py | 90 ++++++ modules/clients.py | 95 ++++++ modules/connlimit.py | 11 + modules/events.py | 30 ++ modules/exceptions.py | 19 ++ modules/fstream/authent.py | 16 + modules/fstream/commands.py | 36 +++ modules/fstream/main.mods | 5 + modules/fstream/main.py | 95 ++++++ modules/fstream/messages.py | 15 + modules/fstream/settings.py | 10 + modules/helpers.py | 15 + modules/main.mods | 9 + modules/servers.py | 56 ++++ modules/settings.py | 17 ++ server.py | 284 ------------------ 21 files changed, 669 insertions(+), 549 deletions(-) create mode 100644 LICENSE delete mode 100644 client-threaded.py rename fstream-util-pipe_to_tcp.py => client/fstream-util-pipe_to_tcp.py (100%) rename fstream-util-tcp_to_pipe.py => client/fstream-util-tcp_to_pipe.py (100%) rename client.py => client/fstream.py (58%) create mode 100644 fsockets.py create mode 100644 modules/clients.py create mode 100644 modules/connlimit.py create mode 100644 modules/events.py create mode 100644 modules/exceptions.py create mode 100644 modules/fstream/authent.py create mode 100644 modules/fstream/commands.py create mode 100644 modules/fstream/main.mods create mode 100644 modules/fstream/main.py create mode 100644 modules/fstream/messages.py create mode 100644 modules/fstream/settings.py create mode 100644 modules/helpers.py create mode 100644 modules/main.mods create mode 100644 modules/servers.py create mode 100644 modules/settings.py delete mode 100644 server.py diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..9173a77 --- /dev/null +++ b/LICENSE @@ -0,0 +1,9 @@ +MIT License + +Copyright (c) 2022 + +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/client-threaded.py b/client-threaded.py deleted file mode 100644 index 069e56d..0000000 --- a/client-threaded.py +++ /dev/null @@ -1,149 +0,0 @@ -#!/usr/bin/env python3 -import sys - -oldexcepthook = sys.excepthook -def newexcepthook(type,value,traceback): - oldexcepthook(type,value,traceback) - #input("Press ENTER to quit.") -sys.excepthook = newexcepthook - -import os -p = os.path.join -pUp = os.path.dirname -s = False -if getattr(sys, 'frozen', False) and hasattr(sys, '_MEIPASS'): - s = os.path.realpath(sys.executable) -else: - s = os.path.realpath(__file__) -sp = pUp(s) - -# script start -import subprocess -import socket -import threading -import queue - -def eprint(*args, **kwargs): print(*args, file=sys.stderr, **kwargs) - -bufferSize = 8192 # buffer size in bytes -timeout = 15 # timeout in seconds -connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - -unbufferedStdout = os.fdopen(sys.stdout.fileno(),"wb",0) # Make unbuffered stdout - -q = queue.Queue() - -class stdoutThread(threading.Thread): - def __init__(self): - threading.Thread.__init__(self) - - def run(self): - while True: - data = q.get() - try: - while True: - data += q.get(False) - except Queue.Empty: - pass - unbufferedStdout.write(data) - -class sendThread(threading.Thread): - def __init__(self,connection): - threading.Thread.__init__(self) - - def run(self): - while True: - data = q.get() - try: - while True: - data += q.get(False) - except Queue.Empty: - pass - connection.sendall(data) - -def listToCommand(lst): - cmd = "" - for arg in lst: - arg = arg.replace("\\","\\\\") - arg = arg.replace(",","\\,") - cmd += arg + "," - - return cmd[:-1] - -def commandToList(cmd): - args = [] - cArg = "" - escape = False - for letter in cmd: - if escape == True: - cArg += letter - escape = False - continue - - if letter == "\\": - escape = True - continue - - if letter == ",": - if cArg == "": continue - args.append(cArg) - cArg = "" - continue - - cArg += letter - - args.append(cArg) - - return args - -def makePayload(lst): - cmdText = listToCommand(lst) - cmdBytes = cmdText.encode("utf-8") - cmdBytes += b" " * (1024 - len(cmdBytes)) - return cmdBytes - -def stringToAddressTuple(addr): - rtn = addr.rsplit(":",1) - rtn[1] = int(rtn[1]) - rtn = tuple(rtn) - return rtn - -def main(): - global serverAddr - serverAddr = stringToAddressTuple(sys.argv[1]) - global bufferSize - - eprint("Connecting to server...") - connection.settimeout(timeout) - connection.connect(serverAddr) - eprint("Sending payload...") - connection.sendall(makePayload(sys.argv[2:])) - eprint("Receiving payload...") - args = commandToList(connection.recv(1024).decode("utf-8").rstrip(" ")) - if sys.argv[2] == "watch": - bufferSize = int(args[0]) - try: - thr = stdoutThread() - thr.start() - eprint("Receiving data...") - while True: - data = connection.recv(bufferSize) - if data == b"": return - q.put(data) - except: - connection.close() - raise - - if sys.argv[2] == "broadcast": - bufferSize = int(sys.argv[3]) - try: - eprint("Sending data...") - while True: - data = sys.stdin.buffer.read(bufferSize) - connection.sendall(data) - except: - connection.close() - raise - -if __name__ == '__main__': - main() diff --git a/fstream-util-pipe_to_tcp.py b/client/fstream-util-pipe_to_tcp.py similarity index 100% rename from fstream-util-pipe_to_tcp.py rename to client/fstream-util-pipe_to_tcp.py diff --git a/fstream-util-tcp_to_pipe.py b/client/fstream-util-tcp_to_pipe.py similarity index 100% rename from fstream-util-tcp_to_pipe.py rename to client/fstream-util-tcp_to_pipe.py diff --git a/client.py b/client/fstream.py similarity index 58% rename from client.py rename to client/fstream.py index 9160394..94af096 100644 --- a/client.py +++ b/client/fstream.py @@ -1,116 +1,141 @@ -#!/usr/bin/env python3 -import sys - -oldexcepthook = sys.excepthook -def newexcepthook(type,value,traceback): - oldexcepthook(type,value,traceback) - #input("Press ENTER to quit.") -sys.excepthook = newexcepthook - -import os -p = os.path.join -pUp = os.path.dirname -s = False -if getattr(sys, 'frozen', False) and hasattr(sys, '_MEIPASS'): - s = os.path.realpath(sys.executable) -else: - s = os.path.realpath(__file__) -sp = pUp(s) - -# script start -import subprocess -import socket - -def eprint(*args, **kwargs): print(*args, file=sys.stderr, **kwargs) - -bufferSize = 8192 # buffer size in bytes -timeout = 15 # timeout in seconds -connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - -unbufferedStdout = os.fdopen(sys.stdout.fileno(),"wb",0) # Make unbuffered stdout - -def listToCommand(lst): - cmd = "" - for arg in lst: - arg = arg.replace("\\","\\\\") - arg = arg.replace(",","\\,") - cmd += arg + "," - - return cmd[:-1] - -def commandToList(cmd): - args = [] - cArg = "" - escape = False - for letter in cmd: - if escape == True: - cArg += letter - escape = False - continue - - if letter == "\\": - escape = True - continue - - if letter == ",": - if cArg == "": continue - args.append(cArg) - cArg = "" - continue - - cArg += letter - - args.append(cArg) - - return args - -def makePayload(lst): - cmdText = listToCommand(lst) - cmdBytes = cmdText.encode("utf-8") - cmdBytes += b" " * (1024 - len(cmdBytes)) - return cmdBytes - -def stringToAddressTuple(addr): - rtn = addr.rsplit(":",1) - rtn[1] = int(rtn[1]) - rtn = tuple(rtn) - return rtn - -def main(): - global serverAddr - serverAddr = stringToAddressTuple(sys.argv[1]) - global bufferSize - - eprint("Connecting to server...") - connection.settimeout(timeout) - connection.connect(serverAddr) - eprint("Sending payload...") - connection.sendall(makePayload(sys.argv[2:])) - eprint("Receiving payload...") - args = commandToList(connection.recv(1024).decode("utf-8").rstrip(" ")) - - if sys.argv[2] == "watch": - bufferSize = int(args[0]) - try: - eprint("Receiving data...") - while True: - data = connection.recv(bufferSize) - if data == b"": return - unbufferedStdout.write(data) - except: - connection.close() - raise - - if sys.argv[2] == "broadcast": - bufferSize = int(sys.argv[3]) - try: - eprint("Sending data...") - while True: - data = sys.stdin.buffer.read(bufferSize) - connection.sendall(data) - except: - connection.close() - raise - -if __name__ == '__main__': - main() +#!/usr/bin/env python3 +import sys + +oldexcepthook = sys.excepthook +def newexcepthook(type,value,traceback): + oldexcepthook(type,value,traceback) + #input("Press ENTER to quit.") +sys.excepthook = newexcepthook + +import os +p = os.path.join +pUp = os.path.dirname +s = False +if getattr(sys, 'frozen', False) and hasattr(sys, '_MEIPASS'): + s = os.path.realpath(sys.executable) +else: + s = os.path.realpath(__file__) +sp = pUp(s) + +# script start +import subprocess +import socket +import time + +def eprint(*args, **kwargs): print(*args, file=sys.stderr, **kwargs) + +bufferSize = 4096 # max buffer size in bytes for receiving data, lower values shouldn't reduce the delay +bufferSizeStdin = 128 # min buffer size for buffer, lower values DO reduce delay but raise CPU usage +timeout = 15 # timeout in seconds +connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + +unbufferedStdout = os.fdopen(sys.stdout.fileno(),"wb",0) # Make unbuffered stdout + +def listToCommand(lst): + cmd = "" + for arg in lst: + arg = arg.replace("\\","\\\\") + arg = arg.replace(",","\\,") + cmd += arg + "," + + return cmd[:-1] + +def commandToList(cmd): + args = [] + cArg = "" + escape = False + for letter in cmd: + if escape == True: + cArg += letter + escape = False + continue + + if letter == "\\": + escape = True + continue + + if letter == ",": + if cArg == "": continue + args.append(cArg) + cArg = "" + continue + + cArg += letter + + args.append(cArg) + + return args + +def recv(conn,l): + start = time.process_time() + timeo = conn.gettimeout() + bytes = b"" + while l > 0: + b = conn.recv(l) + if b == b"": raise ConnectionResetError + if time.process_time() - start > timeo: raise TimeoutError + bytes += b + l -= len(b) + return bytes + +def getResponse(connection,maxLength = 0): + data = b'' + data = recv(connection,4) + if not data: return False + nul = recv(connection,1) + if not nul: return False + if nul != b"\x00": return False + requestLength = int.from_bytes(data,"big") + if maxLength != 0 and requestLength > maxLength: return False + return recv(connection,requestLength) + +def sendResponse(connection,data): + connection.sendall(len(data).to_bytes(4,"big") + b"\x00" + data) + +def stringToAddressTuple(addr): + rtn = addr.rsplit(":",1) + rtn[1] = int(rtn[1]) + rtn = tuple(rtn) + return rtn + +def main(): + global serverAddr + serverAddr = stringToAddressTuple(sys.argv[1]) + global bufferSize + + eprint("Connecting to server...") + connection.settimeout(timeout) + connection.connect(serverAddr) + eprint("Sending payload...") + sendResponse(connection,sys.argv[2].encode("utf-8")) + + cmd = commandToList(sys.argv[2]) + args = {} + for arg in cmd[1:]: + argSplit = arg.split("=",1) + args[argSplit[0]] = argSplit[1] + cmd = cmd[0] + + if cmd == "watch": + try: + eprint("Receiving data...") + while True: + data = connection.recv(bufferSize) + if data == b"": return + unbufferedStdout.write(data) + except: + connection.close() + raise + + if cmd == "broadcast": + try: + eprint("Sending data...") + while True: + data = sys.stdin.buffer.read(bufferSizeStdin) + connection.sendall(data) + except: + connection.close() + raise + +if __name__ == '__main__': + main() diff --git a/fsockets.py b/fsockets.py new file mode 100644 index 0000000..aca0123 --- /dev/null +++ b/fsockets.py @@ -0,0 +1,90 @@ +#!/usr/bin/env python3 +import sys +import os +p = os.path.join +pUp = os.path.dirname +s = False +if getattr(sys, 'frozen', False) and hasattr(sys, '_MEIPASS'): + s = os.path.realpath(sys.executable) +else: + s = os.path.realpath(__file__) +sp = pUp(s) + +# script start +import socket +import threading +import queue + +# IMPORTANT! Obtain locks in this order, if you need multiple at once: +# - clientDataLock +# - clientsLock +# - serverThreadsLock +# - fileLock +modulePath = p(sp,"modules") + +mainQueue = queue.Queue() +clientsLock = threading.Lock() +clientID = 0 +clients = {} +clientDataLock = threading.Lock() +clientData = {} +serverThreadsLock = threading.Lock() +serverThreads = [] +fileLock = threading.Lock() + +def runCode(str, lcs = False, description = "loose-code"): + if lcs == False: lcs = {} + code = compile(str,description,"exec") + exec(code,globals(),lcs) + return lcs + +def runScript(sf, lcs = False): + if lcs == False: lcs = {} + + code = False + with fileLock: + with open(sf,"r",encoding="utf-8") as script: + code = script.read() + + runCode(code,lcs,sf) + return lcs + +def readModuleFile(path): + with open(path,"r",encoding="utf-8") as modulesFile: + for line in modulesFile: + line = line.split("#",1)[0].strip(" \t\r\n") + if line == "": continue + modType = line.rsplit(".",1)[-1].lower() + line = line.replace("\\","/") + + if modType == "mods": + print(">> " +line+ " <<") + else: + print("> " +line+ " ...") + + line = line.replace("/",os.path.sep) + if line.startswith("." +os.path.sep): + line = pUp(path) + line[1:] + else: + line = p(modulePath,line) + + if modType == "py": + runScript(line,locals()) + + if modType == "mods": + readModuleFile(line) + +def main(): + if os.path.isfile(p(modulePath,"main.mods")): + print("Loading modules...") + print(">> main.mods <<") + readModuleFile(p(modulePath,"main.mods")) + print("OK.\n") + + with serverThreadsLock: + for server in servers: + makeServer(*server) + + print("Serving!\n") + +main() \ No newline at end of file diff --git a/modules/clients.py b/modules/clients.py new file mode 100644 index 0000000..567909a --- /dev/null +++ b/modules/clients.py @@ -0,0 +1,95 @@ +global clientThreadIn +class clientThreadIn(threading.Thread): + def __init__(self,cID,connection,address): + threading.Thread.__init__(self) + self.cID = cID + self.connection = connection + self.address = address + + def run(self): + try: + clientLoopIn(self) + except Exception as e: + handleException(e) + + with clientDataLock: + with clientsLock: + closeClient(self.cID,0) + +global clientThreadOut +class clientThreadOut(threading.Thread): + def __init__(self,cID,connection,address): + threading.Thread.__init__(self) + self.cID = cID + self.connection = connection + self.address = address + + def run(self): + try: + clientLoopOut(self) + except Exception as e: + handleException(e) + + with clientDataLock: + with clientsLock: + closeClient(self.cID,1) + +global closeClient +def closeClient(cID,threadType = None): + try: # Close connection + clients[cID][0].close() + except: + pass + + try: # Set reference of connection to false, to denote the client is to not be served + clients[cID][0] = False + except: + pass + + try: # Set reference of the thread to false, to denote that it is closed + if threadType != None: + clients[cID][1 + threadType] = False + except: + pass + + try: # Get rid of leftover data to free memory + if clients[cID] == [False,False,False]: + del clients[cID] + del clientData[cID] + except: + pass + +global setClientData +def setClientData(cID,key,data): + clientData[cID][key] = data + +global getClientData +def getClientData(cID,key): + if not key in clientData[cID]: return None + return clientData[cID][key] + +def main(): + def onConnectionEvent(event,eEnv,connection,address): + with clientDataLock: + with clientsLock: + global clientID + clientID += 1 + cID = str(clientID) + threadIn = clientThreadIn(cID,connection,address) + threadOut = False + if enableOutThread: + threadOut = clientThreadOut(cID,connection,address) + clients[cID] = [connection,threadIn,threadOut] + clientData[cID] = {"address":address} + threadIn.start() + if enableOutThread: + threadOut.start() + + if clientDebug: + print("---") + print("Clients: " +str(len(clients))) + print("Threads: " +str(threading.active_count())) + return True + addEventHandler("onConnection",onConnectionEvent) + +main() \ No newline at end of file diff --git a/modules/connlimit.py b/modules/connlimit.py new file mode 100644 index 0000000..0259902 --- /dev/null +++ b/modules/connlimit.py @@ -0,0 +1,11 @@ +def main(): + def onConnectionEvent(event,eEnv,connection,address): + count = 0 + with clientDataLock: + for cID in clientData: + if getClientData(cID,"address") == address: + count += 1 + if count >= maxConnections: return False + return True + addEventHandler("onConnection",onConnectionEvent) +main() \ No newline at end of file diff --git a/modules/events.py b/modules/events.py new file mode 100644 index 0000000..6c17f80 --- /dev/null +++ b/modules/events.py @@ -0,0 +1,30 @@ +global eventHandlers +eventHandlers = {} + +global addEventHandler +def addEventHandler(event,handler): + if not event in eventHandlers: eventHandlers[event] = [] + try: + eventHandlers[event].remove(handler) + except: + pass + eventHandlers[event].append(handler) + +global removeEventHandler +def removeEventHandler(event,handler): + if not event in eventHandlers: return + try: + eventHandlers[event].remove(handler) + except: + pass + if len(eventHandlers[event]) == 0: + del eventHandlers[event] + +global triggerEvent +def triggerEvent(event,*args,eEnv=False,**kwargs): + if not eEnv: eEnv = {} + if not event in eventHandlers: return + for func in eventHandlers[event]: + result = func(event,eEnv,*args,**kwargs) + if result == False: return False + return True \ No newline at end of file diff --git a/modules/exceptions.py b/modules/exceptions.py new file mode 100644 index 0000000..f15dcf4 --- /dev/null +++ b/modules/exceptions.py @@ -0,0 +1,19 @@ +global traceback +import traceback + +global excConnectionClosed +class excConnectionClosed(Exception): pass + +global handleException +def handleException(e): + try: + if printExceptions: + print(traceback.format_exc()) + except: + try: + print(e) + except: + try: + print("Printing exception failed!") + except: + pass \ No newline at end of file diff --git a/modules/fstream/authent.py b/modules/fstream/authent.py new file mode 100644 index 0000000..f79f2fe --- /dev/null +++ b/modules/fstream/authent.py @@ -0,0 +1,16 @@ +global userPath +userPath = p(sp,"users") + +global configparser +import configparser + +global authenticate +def authenticate(user,passw): + fpath = p(userPath,user.replace("/","").replace("\\","").replace("..","") + ".ini") + if not os.path.isfile(fpath): return + config = configparser.ConfigParser() + with fileLock: config.read(fpath) + if not "DEFAULT" in config: config["DEFAULT"] = {} + if not config["DEFAULT"]["password"]: config["DEFAULT"]["password"] = "" + if passw != config["DEFAULT"]["password"]: return False + return True \ No newline at end of file diff --git a/modules/fstream/commands.py b/modules/fstream/commands.py new file mode 100644 index 0000000..8d95b2d --- /dev/null +++ b/modules/fstream/commands.py @@ -0,0 +1,36 @@ +global commandToList +def commandToList(cmd): + args = [] + cArg = "" + escape = False + for letter in cmd: + if escape == True: + cArg += letter + escape = False + continue + + if letter == "\\": + escape = True + continue + + if letter == ",": + if cArg == "": continue + args.append(cArg) + cArg = "" + continue + + cArg += letter + + args.append(cArg) + + return args + +global listToCommand +def listToCommand(lst): + cmd = "" + for arg in lst: + arg = arg.replace("\\","\\\\") + arg = arg.replace(",","\\,") + cmd += arg + "," + + return cmd[:-1] \ No newline at end of file diff --git a/modules/fstream/main.mods b/modules/fstream/main.mods new file mode 100644 index 0000000..eab4340 --- /dev/null +++ b/modules/fstream/main.mods @@ -0,0 +1,5 @@ +./settings.py +./messages.py +./commands.py +./authent.py +./main.py \ No newline at end of file diff --git a/modules/fstream/main.py b/modules/fstream/main.py new file mode 100644 index 0000000..8ab1aca --- /dev/null +++ b/modules/fstream/main.py @@ -0,0 +1,95 @@ +global select +import select +global time +import time + +global clientLoopIn +def clientLoopIn(self): + cmd = getResponse(self.connection,1024).decode("utf-8") + if cmd == False: return + cmd = commandToList(cmd) + args = {} + q = False + for arg in cmd[1:]: + argSplit = arg.split("=",1) + args[argSplit[0]] = argSplit[1] + + if not "channel" in args: args["channel"] = "default" + if not "channel-password" in args: args["channel-password"] = "" + if not "delay" in args: args["delay"] = 0.1 + if not "user-password" in args: args["user-password"] = "" + args["delay"] = float(args["delay"]) + if args["delay"] < minDelay: args["delay"] = minDelay + if args["delay"] > maxDelay: args["delay"] = maxDelay + + if not authenticate(args["user"],args["user-password"]): return + + with clientDataLock: + setClientData(self.cID,"type",cmd[0]) + setClientData(self.cID,"args",args) + + if cmd[0] == "broadcast": + setClientData(self.cID,"buffer",{}) + setClientData(self.cID,"bufferPacket",-1) + + if cmd[0] == "watch": + q = queue.Queue() + setClientData(self.cID,"queue",q) + + if cmd[0] == "broadcast": + buffer = getClientData(self.cID,"buffer") + packet = -1 + packetMin = 0 + bufferSize = 0 + lastReceived = time.process_time() + while True: + args["delay"] = 0 + data = self.connection.recv(connBuffer) + if data == b"": return + with clientDataLock: + dataSize = len(data) + if dataSize > maxBuffer: return + bufferSize += dataSize + bufferCost + while bufferSize > maxBuffer: + bufferSize -= len(buffer[str(packetMin)]) - bufferCost + del buffer[str(packetMin)] + packetMin += 1 + packet += 1 + buffer[str(packet)] = data + setClientData(self.cID,"bufferPacket",packet) + with clientsLock: + for cID in clients: + if getClientData(cID,"type") == "watch" and getClientData(cID,"args")["user"] == args["user"] and getClientData(cID,"args")["channel"] == args["channel"] and getClientData(cID,"args")["channel-password"] == args["channel-password"]: + getClientData(cID,"queue").put("") + + now = time.process_time() + timeSpent = now - lastReceived + wait = args["delay"] - timeSpent + if wait > 0: time.sleep(wait) + lastReceived = now + + if cmd[0] == "watch": + packet = -1 + watchID = False + data = b"" + with clientDataLock: + with clientsLock: + for cID in clients: + if getClientData(cID,"args")["user"] == args["user"] and getClientData(cID,"type") == "broadcast": + watchID = cID + packet = getClientData(cID,"bufferPacket") + + if watchID == False: return + if packet == -1: + q.get(timeout=timeout) + with clientDataLock: + packet = getClientData(watchID,"bufferPacket") + data = getClientData(watchID,"buffer")[str(packet)] + self.connection.sendall(data) + + while True: + q.get(timeout=timeout) + packet += 1 + with clientDataLock: + data = getClientData(watchID,"buffer")[str(packet)] + self.connection.sendall(data) \ No newline at end of file diff --git a/modules/fstream/messages.py b/modules/fstream/messages.py new file mode 100644 index 0000000..3aa93ed --- /dev/null +++ b/modules/fstream/messages.py @@ -0,0 +1,15 @@ +global getResponse +def getResponse(connection,maxLength = 0): + data = b'' + data = recv(connection,4) + if not data: return False + nul = recv(connection,1) + if not nul: return False + if nul != b"\x00": return False + requestLength = int.from_bytes(data,"big") + if maxLength != 0 and requestLength > maxLength: return False + return recv(connection,requestLength) + +global sendResponse +def sendResponse(connection,data): + connection.sendall(len(data).to_bytes(4,"big") + b"\x00" + data) \ No newline at end of file diff --git a/modules/fstream/settings.py b/modules/fstream/settings.py new file mode 100644 index 0000000..b3a249d --- /dev/null +++ b/modules/fstream/settings.py @@ -0,0 +1,10 @@ +global connBuffer +connBuffer = 1024 # How large can a buffer piece be in bytes? +global bufferCost +bufferCost = 1024 # Virtually add extra cost to each buffer piece to prevent clients from overloading the server by sending super small pieces. +global maxBuffer +maxBuffer = 20*1024*1024 # The maximum buffer size of a stream in bytes. Old buffers are discarded, clients that depend on them get disconnected. +global minDelay +minDelay = 0.05 # The minimum delay (pause between each buffer iteration) the user can set in seconds. Lower values cause higher CPU usage. +global maxDelay +maxDelay = 1 # The maximum delay the user can set. \ No newline at end of file diff --git a/modules/helpers.py b/modules/helpers.py new file mode 100644 index 0000000..50013fe --- /dev/null +++ b/modules/helpers.py @@ -0,0 +1,15 @@ +global time +import time + +global recv +def recv(conn,l): + start = time.process_time() + timeo = conn.gettimeout() + bytes = b"" + while l > 0: + b = conn.recv(l) + if b == b"": raise ConnectionResetError + if time.process_time() - start > timeo: raise TimeoutError + bytes += b + l -= len(b) + return bytes \ No newline at end of file diff --git a/modules/main.mods b/modules/main.mods new file mode 100644 index 0000000..8b8ed3a --- /dev/null +++ b/modules/main.mods @@ -0,0 +1,9 @@ +settings.py # User settings +helpers.py # Helper functions +events.py # Event/event handler implementation +exceptions.py # Handle exceptions, close connections +servers.py # Create sockets, optionally with SSL/TLS +connlimit.py # Optional: Limit the amount of connections made by one IP +clients.py # Create and remove client sessions and connections +#http/main.mods # HTTP server +fstream/main.mods # Media stream server \ No newline at end of file diff --git a/modules/servers.py b/modules/servers.py new file mode 100644 index 0000000..23c13fa --- /dev/null +++ b/modules/servers.py @@ -0,0 +1,56 @@ +global ssl +import ssl + +global serverThread +class serverThread(threading.Thread): + def __init__(self,socket,isHttps): + threading.Thread.__init__(self) + self.socket = socket + self.isHttps = isHttps + + def run(self): + connection = False + address = False + while True: + try: + connection,address = self.socket.accept() + except: + continue + + try: + if self.isHttps: + connection.settimeout(5) + connection.do_handshake() + connection.settimeout(timeout) + if not triggerEvent("onConnection",connection,address): raise excConnectionClosed + except Exception as e: + handleException(e) + try: + connection.close() + except: + pass + +global makeServer +def makeServer(host,port,https): + print("Opening " +str(host)+ ":" +str(port)+ " (" +str(https)+ ") ...") + serverSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + serverSocket.bind((host,port)) + if https: + proto = False + if sys.version_info >= (3,10): + proto = ssl.PROTOCOL_TLS_SERVER + else: + proto = ssl.PROTOCOL_TLS + + ctx = ssl.SSLContext(proto) + ctx.check_hostname = False + ctx.load_cert_chain(https) + serverSocket = ctx.wrap_socket( + serverSocket, + server_side = True, + do_handshake_on_connect = False + ) + serverSocket.listen(65535) + thread = serverThread(serverSocket,not (https == False)) + serverThreads.append(thread) + thread.start() \ No newline at end of file diff --git a/modules/settings.py b/modules/settings.py new file mode 100644 index 0000000..3298918 --- /dev/null +++ b/modules/settings.py @@ -0,0 +1,17 @@ +global servers +servers = [ +# Host Port SSL Certificate + ("127.0.0.1", 61920, False), +# ("127.0.0.1", 443, "localhost.pem") +] + +global timeout +timeout = 15 # Seconds until the connection should be timed out +global maxConnections +maxConnections = 5 # Maximum connections per IP, needs connlimit.py to be activated +global enableOutThread +enableOutThread = False # Use a seperate thread for data output? +global printExceptions +printExceptions = False # Print exceptions as they happen, enable if you're developing +global clientDebug +clientDebug = False # Print how many clients and threads there are \ No newline at end of file diff --git a/server.py b/server.py deleted file mode 100644 index 11acb93..0000000 --- a/server.py +++ /dev/null @@ -1,284 +0,0 @@ -#!/usr/bin/env python3 -import sys - -oldexcepthook = sys.excepthook -def newexcepthook(type,value,traceback): - oldexcepthook(type,value,traceback) - #input("Press ENTER to quit.") -sys.excepthook = newexcepthook - -import os -p = os.path.join -pUp = os.path.dirname -s = False -if getattr(sys, 'frozen', False) and hasattr(sys, '_MEIPASS'): - s = os.path.realpath(sys.executable) -else: - s = os.path.realpath(__file__) -sp = pUp(s) - -import threading -import queue -import socket -import configparser - -debug = True -bufferSize = 8096 # 8 KiB -timeout = 15 -maxClients = 100 -maxClientsPerIP = 10 -maxBroadcastsPerIP = 3 -maxAccumulatedData = 10485760 # 10 MiB -addresses = [ - ("127.0.0.1",61920) -] -userPath = p(sp,"users") - -clients = {} -clientsID = 0 -clientsLock = threading.Lock() - -pendingConnections = queue.Queue() - -def move (y, x): - print("\033[%d;%dH" % (y, x)) - -def listToCommand(lst): - cmd = "" - for arg in lst: - arg = arg.replace("\\","\\\\") - arg = arg.replace(",","\\,") - cmd += arg + "," - - return cmd[:-1] - -def commandToList(cmd): - args = [] - cArg = "" - escape = False - for letter in cmd: - if escape == True: - cArg += letter - escape = False - continue - - if letter == "\\": - escape = True - continue - - if letter == ",": - if cArg == "": continue - args.append(cArg) - cArg = "" - continue - - cArg += letter - - args.append(cArg) - - return args - -def makePayload(lst): - cmdText = listToCommand(lst) - cmdBytes = cmdText.encode("utf-8") - cmdBytes += b" " * (1024 - len(cmdBytes)) - return cmdBytes - -class socketThread(threading.Thread): - def __init__(self,ip): - threading.Thread.__init__(self) - self.ip = ip - self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.socket.bind(self.ip) - - def run(self): - self.socket.listen(32766) - while True: - connection = False - try: - connection,address = self.socket.accept() - pendingConnections.put((connection,address)) - except Exception as e: - try: - connection.close() - except: - pass - - print(e) - -class clientThread(threading.Thread): - def __init__(self,clientID,env): - threading.Thread.__init__(self) - self.clientID = clientID - self.env = env - self.queue = queue.Queue() - - def childRun(self): - if self.env["command"] == "watch": - self.env["connection"].sendall(makePayload([str(self.env["bsize"])])) - while True: - data = self.queue.get(timeout=timeout) - while self.queue.qsize() > 0: - try: - data += self.queue.get(False) - except Queue.Empty: - pass - - self.env["connection"].sendall(data) - - if self.env["command"] == "broadcast": - self.env["connection"].sendall(makePayload(["dummy"])) - while True: - data = self.env["connection"].recv(self.env["bsize"]) - if data == b"": return - with clientsLock: - for clientID in clients: - try: - if clients[clientID]["command"] != "watch": continue - if clients[clientID]["client"] != str(self.clientID): continue - clients[clientID]["thread"].queue.put(data) - except: - pass - - def run(self): - try: - self.childRun() - except Exception as e: - try: - print(e) - except: - print("Could not print exception.") - - try: - self.env["connection"].close() - except: - pass - - try: - with clientsLock: - del clients[str(self.clientID)] - except: - pass - -def handleConnection(connection,address): - global clientsID - args = commandToList(connection.recv(1024).decode("utf-8").rstrip(" ")) - command = args[0] - if command == "watch": - user = args[1] - room = args[2] - passw = args[3] - - watchID = False - with clientsLock: - for clientID in clients: - if clients[clientID]["command"] != "broadcast": continue - if clients[clientID]["user"] != user: continue - if clients[clientID]["room"] != room: continue - if clients[clientID]["passw"] != passw: continue - watchID = clientID - break - - if watchID == False: - connection.close() - return - - env = { - "command": "watch", - "client": watchID, - "connection": connection, - "address": address, - "bsize": clients[watchID]["bsize"] - } - - clientsID += 1 - thr = clientThread(clientsID,env.copy()) - env["thread"] = thr - clients[str(clientsID)] = env - thr.start() - return - - if command == "broadcast": - bsize = int(args[1]) - user = args[2] - userPassw = args[3] - room = args[4] - roomPassw = args[5] - - config = configparser.ConfigParser() - config.read(p(userPath,user+ ".ini")) - if config["DEFAULT"]["password"] != userPassw: - connection.close() - return - - env = { - "command": "broadcast", - "user": user, - "room": room, - "passw": roomPassw, - "bsize": bsize, - "connection": connection, - "address": address - } - - with clientsLock: - clientsID += 1 - thr = clientThread(clientsID,env.copy()) - env["thread"] = thr - clients[str(clientsID)] = env - thr.start() - return - - connection.close() - -class debugThread(threading.Thread): - def __init__(self): - threading.Thread.__init__(self) - global time - import time - - def run(self): - while True: - clientCount = 0 - accumDataCount = 0 - threadCount = threading.active_count() - with clientsLock: - clientCount = len(clients) - for clientID in clients: - accumDataCount += (clients[clientID]["thread"].queue.qsize() * clients[clientID]["bsize"]) / 1000000 - - #move(0,0) - print(" --- ") - print(" Threads: " +str(threadCount)+ " ") - print(" Clients: " +str(clientCount)+ " ") - print(" Accumulated Data: " +str(accumDataCount)+ "MB ") - time.sleep(1) - -def main(): - for address in addresses: - print("Opening TCP socket " +address[0]+ ":" +str(address[1])+ "...") - thr = socketThread(address) - thr.start() - - print("Serving!") - if debug == True: - thr = debugThread() - thr.start() - - while True: - connection,address = pendingConnections.get() - try: - handleConnection(connection,address) - except Exception as e: - try: - connection.close() - except: - pass - - try: - print(e) - except: - print("Could not print exception.") - -if __name__ == '__main__': - main() \ No newline at end of file