fstream/server.py

284 lines
5.9 KiB
Python

#!/usr/bin/env python3
import sys
oldexcepthook = sys.excepthook
def newexcepthook(type,value,traceback):
oldexcepthook(type,value,traceback)
#input("Press ENTER to quit.")
sys.excepthook = newexcepthook
import os
p = os.path.join
pUp = os.path.dirname
s = False
if getattr(sys, 'frozen', False) and hasattr(sys, '_MEIPASS'):
s = os.path.realpath(sys.executable)
else:
s = os.path.realpath(__file__)
sp = pUp(s)
import threading
import queue
import socket
import configparser
debug = True
bufferSize = 8096 # 8 KiB
timeout = 15
maxClients = 100
maxClientsPerIP = 10
maxBroadcastsPerIP = 3
maxAccumulatedData = 10485760 # 10 MiB
addresses = [
("127.0.0.1",61920)
]
userPath = p(sp,"users")
clients = {}
clientsID = 0
clientsLock = threading.Lock()
pendingConnections = queue.Queue()
def move (y, x):
print("\033[%d;%dH" % (y, x))
def listToCommand(lst):
cmd = ""
for arg in lst:
arg = arg.replace("\\","\\\\")
arg = arg.replace(",","\\,")
cmd += arg + ","
return cmd[:-1]
def commandToList(cmd):
args = []
cArg = ""
escape = False
for letter in cmd:
if escape == True:
cArg += letter
escape = False
continue
if letter == "\\":
escape = True
continue
if letter == ",":
if cArg == "": continue
args.append(cArg)
cArg = ""
continue
cArg += letter
args.append(cArg)
return args
def makePayload(lst):
cmdText = listToCommand(lst)
cmdBytes = cmdText.encode("utf-8")
cmdBytes += b" " * (1024 - len(cmdBytes))
return cmdBytes
class socketThread(threading.Thread):
def __init__(self,ip):
threading.Thread.__init__(self)
self.ip = ip
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.bind(self.ip)
def run(self):
self.socket.listen(32766)
while True:
connection = False
try:
connection,address = self.socket.accept()
pendingConnections.put((connection,address))
except Exception as e:
try:
connection.close()
except:
pass
print(e)
class clientThread(threading.Thread):
def __init__(self,clientID,env):
threading.Thread.__init__(self)
self.clientID = clientID
self.env = env
self.queue = queue.Queue()
def childRun(self):
if self.env["command"] == "watch":
self.env["connection"].sendall(makePayload([str(self.env["bsize"])]))
while True:
data = self.queue.get(timeout=timeout)
while self.queue.qsize() > 0:
try:
data += self.queue.get(False)
except Queue.Empty:
pass
self.env["connection"].sendall(data)
if self.env["command"] == "broadcast":
self.env["connection"].sendall(makePayload(["dummy"]))
while True:
data = self.env["connection"].recv(self.env["bsize"])
if data == b"": return
with clientsLock:
for clientID in clients:
try:
if clients[clientID]["command"] != "watch": continue
if clients[clientID]["client"] != str(self.clientID): continue
clients[clientID]["thread"].queue.put(data)
except:
pass
def run(self):
try:
self.childRun()
except Exception as e:
try:
print(e)
except:
print("Could not print exception.")
try:
self.env["connection"].close()
except:
pass
try:
with clientsLock:
del clients[str(self.clientID)]
except:
pass
def handleConnection(connection,address):
global clientsID
args = commandToList(connection.recv(1024).decode("utf-8").rstrip(" "))
command = args[0]
if command == "watch":
user = args[1]
room = args[2]
passw = args[3]
watchID = False
with clientsLock:
for clientID in clients:
if clients[clientID]["command"] != "broadcast": continue
if clients[clientID]["user"] != user: continue
if clients[clientID]["room"] != room: continue
if clients[clientID]["passw"] != passw: continue
watchID = clientID
break
if watchID == False:
connection.close()
return
env = {
"command": "watch",
"client": watchID,
"connection": connection,
"address": address,
"bsize": clients[watchID]["bsize"]
}
clientsID += 1
thr = clientThread(clientsID,env.copy())
env["thread"] = thr
clients[str(clientsID)] = env
thr.start()
return
if command == "broadcast":
bsize = int(args[1])
user = args[2]
userPassw = args[3]
room = args[4]
roomPassw = args[5]
config = configparser.ConfigParser()
config.read(p(userPath,user+ ".ini"))
if config["DEFAULT"]["password"] != userPassw:
connection.close()
return
env = {
"command": "broadcast",
"user": user,
"room": room,
"passw": roomPassw,
"bsize": bsize,
"connection": connection,
"address": address
}
with clientsLock:
clientsID += 1
thr = clientThread(clientsID,env.copy())
env["thread"] = thr
clients[str(clientsID)] = env
thr.start()
return
connection.close()
class debugThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
global time
import time
def run(self):
while True:
clientCount = 0
accumDataCount = 0
threadCount = threading.active_count()
with clientsLock:
clientCount = len(clients)
for clientID in clients:
accumDataCount += (clients[clientID]["thread"].queue.qsize() * clients[clientID]["bsize"]) / 1000000
#move(0,0)
print(" --- ")
print(" Threads: " +str(threadCount)+ " ")
print(" Clients: " +str(clientCount)+ " ")
print(" Accumulated Data: " +str(accumDataCount)+ "MB ")
time.sleep(1)
def main():
for address in addresses:
print("Opening TCP socket " +address[0]+ ":" +str(address[1])+ "...")
thr = socketThread(address)
thr.start()
print("Serving!")
if debug == True:
thr = debugThread()
thr.start()
while True:
connection,address = pendingConnections.get()
try:
handleConnection(connection,address)
except Exception as e:
try:
connection.close()
except:
pass
try:
print(e)
except:
print("Could not print exception.")
if __name__ == '__main__':
main()