From eb05c89d4708a65f6860fa92e43e9e35e3579fc3 Mon Sep 17 00:00:00 2001 From: Fierelier Date: Thu, 22 Apr 2021 04:29:12 +0200 Subject: [PATCH] Server Remastered --- fstream-server.py | 242 +++++++++++++++++++++++++--------------------- 1 file changed, 130 insertions(+), 112 deletions(-) diff --git a/fstream-server.py b/fstream-server.py index 62f300d..1bb1177 100644 --- a/fstream-server.py +++ b/fstream-server.py @@ -25,14 +25,15 @@ import subprocess import configparser import time -outThreads = {} -inThreads = {} connections = {} -threadId = 0 -threadsLock = threading.Lock() -fileLock = threading.Lock() +connectionsId = 0 connectionsLock = threading.Lock() +threadCount = 0 +threadCountLock = threading.Lock() + +fileLock = threading.Lock() + serverAddr = ("127.0.0.1",61920) serverSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -43,6 +44,18 @@ maxClientsPerIP = 3 # How many clients can be connected at maximum, per IP? maxAccumulatedData = 50*1000*1000 # How much data can be in an outbound thread's queue at maximum before the connection is closed? The maximum amount of queue data is maxAccumulatedData * maxClients bytes. maxInboundTransferRate = 0 # Maximum upload speed for broadcasters. This sucks right now. Set to 0 to disable. +def addThread(): + global threadCount + with threadCountLock: + threadCount += 1 + print("Thread opened. Threads: " +str(threadCount)+ " (Actual: " +str(threading.active_count())+ ")") + +def removeThread(): + global threadCount + with threadCountLock: + threadCount -= 1 + print("Thread closed. Threads: " +str(threadCount)+ " (Actual: " +str(threading.active_count())+ ")") + def commandToList(cmd): args = [] cArg = "" @@ -75,87 +88,62 @@ def commandToList(cmd): return args +def closeConnection(connectionId): + if connectionId in connections: + del connections[connectionId] + class outThread(threading.Thread): - def __init__(self,threadId,user): + def __init__(self,connectionId): threading.Thread.__init__(self) - self.threadId = threadId self.queue = queue.Queue() - self.user = user - self.ignore = False - - def closeConnection(self): - with connectionsLock: - if str(self.threadId) in connections: - try: - connections[str(self.threadId)][0].close() - except Exception as e: - print("Warning, closing connection failed: " +str(e)) - del connections[str(self.threadId)] + self.connectionId = connectionId def getConnection(self): with connectionsLock: - if str(self.threadId) in connections: - return connections[str(self.threadId)] - - return False - - def closeThread(self): - with threadsLock: - self.closeConnection() - if str(self.threadId) in outThreads: del outThreads[str(self.threadId)] + if self.connectionId in connections: + return connections[self.connectionId]["connection"] + return False def run(self): try: while True: data = self.queue.get(timeout=15) - if type(data) == tuple: - data[0](*data[1],**data[2]) - if data[0] == self.closeThread: return - continue - self.getConnection()[0].sendall(data) - self.closeThread() + connection = self.getConnection() + if not connection: + removeThread() + return + connection.sendall(data) + with connectionsLock: closeConnection(self.connectionId) + removeThread() + return except: - self.closeThread() + with connectionsLock: closeConnection(self.connectionId) + removeThread() print("Thread closed - Exception:") raise class inThread(threading.Thread): - def __init__(self,threadId): + def __init__(self,connectionId): threading.Thread.__init__(self) - self.threadId = threadId - - def closeConnection(self): - with connectionsLock: - if str(self.threadId) in connections: - try: - connections[str(self.threadId)][0].close() - except: - print("warning, closing connection failed") - del connections[str(self.threadId)] + self.connectionId = connectionId def getConnection(self): with connectionsLock: - if str(self.threadId) in connections: - return connections[str(self.threadId)] - - def closeThread(self,closeConnection = True): - with threadsLock: - if closeConnection: - for thread in outThreads: - thread = outThreads[thread] - if thread.user == self.user: - thread.queue.put((thread.closeThread,[],{})) - - self.closeConnection() - - if str(self.threadId) in inThreads: del inThreads[str(self.threadId)] + if self.connectionId in connections: + return connections[self.connectionId]["connection"] + return False def run(self): try: - data = self.getConnection()[0].recv(1000) + connection = self.getConnection() + if not connection: + removeThread() + return + data = connection.recv(1000) if data == b"": - self.closeThread() + with connectionsLock: closeConnection(self.connectionId) print("Thread closed - Client disconnected.") + removeThread() return data = data.decode("utf-8") while data[-1] == " ": data = data[:-1] @@ -171,8 +159,9 @@ class inThread(threading.Thread): userPath = p(sp,"users",user+ ".ini") with fileLock: if not os.path.isfile(userPath): - self.closeThread() + with connectionsLock: closeConnection(self.connectionId) print("Thread closed - Invalid user given: " +user) + removeThread() return config = configparser.ConfigParser() @@ -180,55 +169,73 @@ class inThread(threading.Thread): if cmd == "watch": if cmd in config and "pass" in config[cmd] and config[cmd]["pass"] != "" and config[cmd]["pass"] != password: - self.closeThread() + with connectionsLock: closeConnection(self.connectionId) print("Thread closed - Invalid password given for user: " +user) + removeThread() return - with threadsLock: - thread = outThread(self.threadId,self.user) - outThreads[str(self.threadId)] = thread + with connectionsLock: + if not self.connectionId in connections: + removeThread() + return + connections[self.connectionId]["action"] = "watch" + connections[self.connectionId]["user"] = user + thread = outThread(self.connectionId) + connections[self.connectionId]["outThread"] = thread thread.start() - - self.closeThread(False) - return + addThread() + connections[self.connectionId]["inThread"] = False + removeThread() + return if cmd == "broadcast": if cmd in config and "pass" in config[cmd] and config[cmd]["pass"] != "" and config[cmd]["pass"] != password: - self.closeThread() + with connectionsLock: closeConnection(self.connectionId) print("Thread closed - Invalid password given for user: " +user) + removeThread() return - with threadsLock: + with connectionsLock: deleteList = [] - for threadId in inThreads: - if threadId == str(self.threadId): continue - thread = inThreads[threadId] - thread.closeConnection() - deleteList.append(threadId) + for connectionId in connections: + if connectionId == self.connectionId: continue + if connections[connectionId]["action"] != "broadcast": continue + if connections[connectionId]["user"] != user: continue + deleteList.append(connectionId) - for threadId in deleteList: - del inThreads[threadId] + for connectionId in deleteList: + closeConnection[connectionId] + + connections[connectionId]["action"] = "broadcast" + connections[connectionId]["user"] = user if maxInboundTransferRate > 0: startTime = time.time() while True: - data = self.getConnection()[0].recv(bufferSize) + connection = self.getConnection() + if not connection: + removeThread() + return + data = connection.recv(bufferSize) if data == b"": - self.closeThread() + with connectionsLock: closeConnection(self.connectionId) print("Thread closed - Client disconnected.") + removeThread() return - with threadsLock: - for thread in outThreads: - thread = outThreads[thread] - if thread.user == user: - accumulatedData = thread.queue.qsize() * bufferSize - if accumulatedData > maxAccumulatedData and not thread.ignore: - thread.ignore = True - thread.queue.put((thread.closeThread,[],{})) - thread.closeConnection() - print("Thread closed - Too much accumulated data.") - else: - thread.queue.put(data) + with connectionsLock: + for connectionId in connections: + connectionData = connections[connectionId] + thread = connectionData["outThread"] + if not thread: continue + if connectionData["user"] != user: continue + accumulatedData = thread.queue.qsize() * bufferSize + if accumulatedData > maxAccumulatedData: + print("Thread closed - Too much accumulated data.") + closeConnection(connectionId) + removeThread() + return + + thread.queue.put(data) if maxInboundTransferRate > 0: endTime = time.time() @@ -244,11 +251,13 @@ class inThread(threading.Thread): startTime = endTime - self.closeThread() + with connectionsLock: closeConnection(self.connectionId) + removeThread() return except: + with connectionsLock: closeConnection(self.connectionId) + removeThread() print("Thread closed - Exception:") - self.closeThread() raise class debugThread(threading.Thread): @@ -257,7 +266,7 @@ class debugThread(threading.Thread): def run(self): while True: - with threadsLock: + with connectionsLock: print("\n---\n") print("Threads - IN: " +str(len(inThreads))) print("Threads - OUT: " +str(len(outThreads))) @@ -294,15 +303,15 @@ def readConfig(): serverAddr = tuple(serverAddrSplit) def main(): - global threadId + global connectionsId readConfig() serverSocket.bind(serverAddr) serverSocket.listen(1024) # DEBUG - debug = debugThread() - debug.start() + #debug = debugThread() + #debug.start() # DEBUG END while True: @@ -312,25 +321,34 @@ def main(): with connectionsLock: clientCount = 0 ipClientCount = 0 - for connId in connections: + for connectionId in connections: clientCount += 1 - conn = connections[connId] - if conn[1][0] == address[0]: + if connections[connectionId]["address"][0] == address[0]: ipClientCount += 1 - if clientCount + 1 > maxClients or ipClientCount + 1 > maxClientsPerIP: - print("Connection closed - same IP connected too many times.") - connection.close() + if clientCount + 1 > maxClients: + print("Connection closed - too many clients.") + closeConnection(connectionId) continue - - with threadsLock: - threadId += 1 - with connectionsLock: - connections[str(threadId)] = (connection,address) - thread = inThread(threadId) - inThreads[str(threadId)] = thread - thread.start() + if ipClientCount + 1 > maxClientsPerIP: + print("Connection closed - same IP connected too many times.") + closeConnection(connectionId) + continue + + connectionsId += 1 + threadIn = inThread(str(connectionsId)) + connections[str(connectionsId)] = { + "connection": connection, + "address": address, + "inThread": threadIn, + "outThread": False, + "action": False, + "user": False + } + + threadIn.start() + addThread() if __name__ == '__main__': main() \ No newline at end of file