Server Remastered

This commit is contained in:
Fierelier 2021-04-22 04:29:12 +02:00
parent bf8505554f
commit eb05c89d47

View File

@ -25,14 +25,15 @@ import subprocess
import configparser
import time
outThreads = {}
inThreads = {}
connections = {}
threadId = 0
threadsLock = threading.Lock()
fileLock = threading.Lock()
connectionsId = 0
connectionsLock = threading.Lock()
threadCount = 0
threadCountLock = threading.Lock()
fileLock = threading.Lock()
serverAddr = ("127.0.0.1",61920)
serverSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@ -43,6 +44,18 @@ maxClientsPerIP = 3 # How many clients can be connected at maximum, per IP?
maxAccumulatedData = 50*1000*1000 # How much data can be in an outbound thread's queue at maximum before the connection is closed? The maximum amount of queue data is maxAccumulatedData * maxClients bytes.
maxInboundTransferRate = 0 # Maximum upload speed for broadcasters. This sucks right now. Set to 0 to disable.
def addThread():
global threadCount
with threadCountLock:
threadCount += 1
print("Thread opened. Threads: " +str(threadCount)+ " (Actual: " +str(threading.active_count())+ ")")
def removeThread():
global threadCount
with threadCountLock:
threadCount -= 1
print("Thread closed. Threads: " +str(threadCount)+ " (Actual: " +str(threading.active_count())+ ")")
def commandToList(cmd):
args = []
cArg = ""
@ -75,87 +88,62 @@ def commandToList(cmd):
return args
class outThread(threading.Thread):
def __init__(self,threadId,user):
threading.Thread.__init__(self)
self.threadId = threadId
self.queue = queue.Queue()
self.user = user
self.ignore = False
def closeConnection(connectionId):
if connectionId in connections:
del connections[connectionId]
def closeConnection(self):
with connectionsLock:
if str(self.threadId) in connections:
try:
connections[str(self.threadId)][0].close()
except Exception as e:
print("Warning, closing connection failed: " +str(e))
del connections[str(self.threadId)]
class outThread(threading.Thread):
def __init__(self,connectionId):
threading.Thread.__init__(self)
self.queue = queue.Queue()
self.connectionId = connectionId
def getConnection(self):
with connectionsLock:
if str(self.threadId) in connections:
return connections[str(self.threadId)]
return False
def closeThread(self):
with threadsLock:
self.closeConnection()
if str(self.threadId) in outThreads: del outThreads[str(self.threadId)]
if self.connectionId in connections:
return connections[self.connectionId]["connection"]
return False
def run(self):
try:
while True:
data = self.queue.get(timeout=15)
if type(data) == tuple:
data[0](*data[1],**data[2])
if data[0] == self.closeThread: return
continue
self.getConnection()[0].sendall(data)
self.closeThread()
connection = self.getConnection()
if not connection:
removeThread()
return
connection.sendall(data)
with connectionsLock: closeConnection(self.connectionId)
removeThread()
return
except:
self.closeThread()
with connectionsLock: closeConnection(self.connectionId)
removeThread()
print("Thread closed - Exception:")
raise
class inThread(threading.Thread):
def __init__(self,threadId):
def __init__(self,connectionId):
threading.Thread.__init__(self)
self.threadId = threadId
def closeConnection(self):
with connectionsLock:
if str(self.threadId) in connections:
try:
connections[str(self.threadId)][0].close()
except:
print("warning, closing connection failed")
del connections[str(self.threadId)]
self.connectionId = connectionId
def getConnection(self):
with connectionsLock:
if str(self.threadId) in connections:
return connections[str(self.threadId)]
def closeThread(self,closeConnection = True):
with threadsLock:
if closeConnection:
for thread in outThreads:
thread = outThreads[thread]
if thread.user == self.user:
thread.queue.put((thread.closeThread,[],{}))
self.closeConnection()
if str(self.threadId) in inThreads: del inThreads[str(self.threadId)]
if self.connectionId in connections:
return connections[self.connectionId]["connection"]
return False
def run(self):
try:
data = self.getConnection()[0].recv(1000)
connection = self.getConnection()
if not connection:
removeThread()
return
data = connection.recv(1000)
if data == b"":
self.closeThread()
with connectionsLock: closeConnection(self.connectionId)
print("Thread closed - Client disconnected.")
removeThread()
return
data = data.decode("utf-8")
while data[-1] == " ": data = data[:-1]
@ -171,8 +159,9 @@ class inThread(threading.Thread):
userPath = p(sp,"users",user+ ".ini")
with fileLock:
if not os.path.isfile(userPath):
self.closeThread()
with connectionsLock: closeConnection(self.connectionId)
print("Thread closed - Invalid user given: " +user)
removeThread()
return
config = configparser.ConfigParser()
@ -180,55 +169,73 @@ class inThread(threading.Thread):
if cmd == "watch":
if cmd in config and "pass" in config[cmd] and config[cmd]["pass"] != "" and config[cmd]["pass"] != password:
self.closeThread()
with connectionsLock: closeConnection(self.connectionId)
print("Thread closed - Invalid password given for user: " +user)
removeThread()
return
with threadsLock:
thread = outThread(self.threadId,self.user)
outThreads[str(self.threadId)] = thread
with connectionsLock:
if not self.connectionId in connections:
removeThread()
return
connections[self.connectionId]["action"] = "watch"
connections[self.connectionId]["user"] = user
thread = outThread(self.connectionId)
connections[self.connectionId]["outThread"] = thread
thread.start()
self.closeThread(False)
return
addThread()
connections[self.connectionId]["inThread"] = False
removeThread()
return
if cmd == "broadcast":
if cmd in config and "pass" in config[cmd] and config[cmd]["pass"] != "" and config[cmd]["pass"] != password:
self.closeThread()
with connectionsLock: closeConnection(self.connectionId)
print("Thread closed - Invalid password given for user: " +user)
removeThread()
return
with threadsLock:
with connectionsLock:
deleteList = []
for threadId in inThreads:
if threadId == str(self.threadId): continue
thread = inThreads[threadId]
thread.closeConnection()
deleteList.append(threadId)
for connectionId in connections:
if connectionId == self.connectionId: continue
if connections[connectionId]["action"] != "broadcast": continue
if connections[connectionId]["user"] != user: continue
deleteList.append(connectionId)
for threadId in deleteList:
del inThreads[threadId]
for connectionId in deleteList:
closeConnection[connectionId]
connections[connectionId]["action"] = "broadcast"
connections[connectionId]["user"] = user
if maxInboundTransferRate > 0: startTime = time.time()
while True:
data = self.getConnection()[0].recv(bufferSize)
connection = self.getConnection()
if not connection:
removeThread()
return
data = connection.recv(bufferSize)
if data == b"":
self.closeThread()
with connectionsLock: closeConnection(self.connectionId)
print("Thread closed - Client disconnected.")
removeThread()
return
with threadsLock:
for thread in outThreads:
thread = outThreads[thread]
if thread.user == user:
accumulatedData = thread.queue.qsize() * bufferSize
if accumulatedData > maxAccumulatedData and not thread.ignore:
thread.ignore = True
thread.queue.put((thread.closeThread,[],{}))
thread.closeConnection()
print("Thread closed - Too much accumulated data.")
else:
thread.queue.put(data)
with connectionsLock:
for connectionId in connections:
connectionData = connections[connectionId]
thread = connectionData["outThread"]
if not thread: continue
if connectionData["user"] != user: continue
accumulatedData = thread.queue.qsize() * bufferSize
if accumulatedData > maxAccumulatedData:
print("Thread closed - Too much accumulated data.")
closeConnection(connectionId)
removeThread()
return
thread.queue.put(data)
if maxInboundTransferRate > 0:
endTime = time.time()
@ -244,11 +251,13 @@ class inThread(threading.Thread):
startTime = endTime
self.closeThread()
with connectionsLock: closeConnection(self.connectionId)
removeThread()
return
except:
with connectionsLock: closeConnection(self.connectionId)
removeThread()
print("Thread closed - Exception:")
self.closeThread()
raise
class debugThread(threading.Thread):
@ -257,7 +266,7 @@ class debugThread(threading.Thread):
def run(self):
while True:
with threadsLock:
with connectionsLock:
print("\n---\n")
print("Threads - IN: " +str(len(inThreads)))
print("Threads - OUT: " +str(len(outThreads)))
@ -294,15 +303,15 @@ def readConfig():
serverAddr = tuple(serverAddrSplit)
def main():
global threadId
global connectionsId
readConfig()
serverSocket.bind(serverAddr)
serverSocket.listen(1024)
# DEBUG
debug = debugThread()
debug.start()
#debug = debugThread()
#debug.start()
# DEBUG END
while True:
@ -312,25 +321,34 @@ def main():
with connectionsLock:
clientCount = 0
ipClientCount = 0
for connId in connections:
for connectionId in connections:
clientCount += 1
conn = connections[connId]
if conn[1][0] == address[0]:
if connections[connectionId]["address"][0] == address[0]:
ipClientCount += 1
if clientCount + 1 > maxClients or ipClientCount + 1 > maxClientsPerIP:
print("Connection closed - same IP connected too many times.")
connection.close()
if clientCount + 1 > maxClients:
print("Connection closed - too many clients.")
closeConnection(connectionId)
continue
with threadsLock:
threadId += 1
with connectionsLock:
connections[str(threadId)] = (connection,address)
if ipClientCount + 1 > maxClientsPerIP:
print("Connection closed - same IP connected too many times.")
closeConnection(connectionId)
continue
thread = inThread(threadId)
inThreads[str(threadId)] = thread
thread.start()
connectionsId += 1
threadIn = inThread(str(connectionsId))
connections[str(connectionsId)] = {
"connection": connection,
"address": address,
"inThread": threadIn,
"outThread": False,
"action": False,
"user": False
}
threadIn.start()
addThread()
if __name__ == '__main__':
main()