2021-04-13 20:06:16 +00:00
#!/usr/bin/env python3
import sys
oldexcepthook = sys . excepthook
def newexcepthook ( type , value , traceback ) :
oldexcepthook ( type , value , traceback )
2021-04-14 01:12:13 +00:00
#input("Press ENTER to quit.")
2021-04-13 20:06:16 +00:00
sys . excepthook = newexcepthook
import os
p = os . path . join
pUp = os . path . dirname
s = False
if getattr ( sys , ' frozen ' , False ) and hasattr ( sys , ' _MEIPASS ' ) :
s = os . path . realpath ( sys . executable )
else :
s = os . path . realpath ( __file__ )
sp = pUp ( s )
# script start
2021-04-13 18:33:16 +00:00
import threading
import queue
import socket
import subprocess
2021-04-14 01:12:13 +00:00
import configparser
2021-04-14 23:20:21 +00:00
import time
2021-04-13 18:33:16 +00:00
2021-04-13 23:09:34 +00:00
outThreads = { }
inThreads = { }
2021-04-14 16:28:54 +00:00
connections = { }
2021-04-13 18:33:16 +00:00
threadId = 0
threadsLock = threading . Lock ( )
2021-04-14 01:12:13 +00:00
fileLock = threading . Lock ( )
2021-04-14 16:28:54 +00:00
connectionsLock = threading . Lock ( )
2021-04-13 18:33:16 +00:00
2021-04-15 18:19:45 +00:00
serverAddr = ( " 127.0.0.1 " , 61920 )
2021-04-13 18:33:16 +00:00
serverSocket = socket . socket ( socket . AF_INET , socket . SOCK_STREAM )
2021-04-14 23:20:21 +00:00
bufferSize = 50000 # Buffer size in bytes
2021-04-14 16:52:47 +00:00
timeout = 15 # How long to wait for a connection to respond before timing out?
2021-04-15 18:13:03 +00:00
maxClients = 20 # How many clients can be connected at maximum?
maxClientsPerIP = 3 # How many clients can be connected at maximum, per IP?
maxAccumulatedData = 50 * 1000 * 1000 # How much data can be in an outbound thread's queue at maximum before the connection is closed? The maximum amount of queue data is maxAccumulatedData * maxClients bytes.
maxInboundTransferRate = 0 # Maximum upload speed for broadcasters. This sucks right now. Set to 0 to disable.
2021-04-13 18:33:16 +00:00
2021-04-13 23:09:34 +00:00
def commandToList ( cmd ) :
args = [ ]
cArg = " "
escape = False
quoted = False
for letter in cmd :
if escape == True :
cArg + = letter
escape = False
continue
if letter == " \\ " :
escape = True
continue
#if quoted == False and letter == ",":
if letter == " , " :
if cArg == " " : continue
args . append ( cArg )
cArg = " "
continue
#if letter == '"':
# quoted = not quoted
# continue
cArg + = letter
args . append ( cArg )
return args
2021-04-13 18:33:16 +00:00
class outThread ( threading . Thread ) :
2021-04-14 16:28:54 +00:00
def __init__ ( self , threadId , user ) :
2021-04-13 18:33:16 +00:00
threading . Thread . __init__ ( self )
self . threadId = threadId
self . queue = queue . Queue ( )
2021-04-13 23:09:34 +00:00
self . user = user
2021-04-14 04:13:58 +00:00
self . ignore = False
2021-04-13 23:09:34 +00:00
2021-04-14 16:28:54 +00:00
def closeConnection ( self ) :
with connectionsLock :
if str ( self . threadId ) in connections :
try :
connections [ str ( self . threadId ) ] [ 0 ] . close ( )
except :
print ( " warning, closing connection failed " )
del connections [ str ( self . threadId ) ]
def getConnection ( self ) :
with connectionsLock :
if str ( self . threadId ) in connections :
return connections [ str ( self . threadId ) ]
return False
2021-04-13 23:09:34 +00:00
def closeThread ( self ) :
with threadsLock :
2021-04-14 16:28:54 +00:00
self . closeConnection ( )
2021-04-14 20:03:26 +00:00
if str ( self . threadId ) in outThreads : del outThreads [ str ( self . threadId ) ]
2021-04-13 18:33:16 +00:00
def run ( self ) :
try :
while True :
data = self . queue . get ( )
2021-04-14 04:13:58 +00:00
if type ( data ) == tuple :
data [ 0 ] ( * data [ 1 ] , * * data [ 2 ] )
if data [ 0 ] == self . closeThread : return
continue
2021-04-14 16:28:54 +00:00
self . getConnection ( ) [ 0 ] . sendall ( data )
2021-04-13 23:09:34 +00:00
self . closeThread ( )
2021-04-13 18:33:16 +00:00
except :
2021-04-13 23:09:34 +00:00
self . closeThread ( )
2021-04-13 18:33:16 +00:00
raise
2021-04-13 23:09:34 +00:00
class inThread ( threading . Thread ) :
2021-04-14 16:28:54 +00:00
def __init__ ( self , threadId ) :
2021-04-13 18:33:16 +00:00
threading . Thread . __init__ ( self )
2021-04-13 23:09:34 +00:00
self . threadId = threadId
2021-04-14 16:28:54 +00:00
def closeConnection ( self ) :
with connectionsLock :
if str ( self . threadId ) in connections :
try :
connections [ str ( self . threadId ) ] [ 0 ] . close ( )
except :
print ( " warning, closing connection failed " )
del connections [ str ( self . threadId ) ]
def getConnection ( self ) :
with connectionsLock :
if str ( self . threadId ) in connections :
return connections [ str ( self . threadId ) ]
2021-04-13 23:09:34 +00:00
2021-04-14 03:12:35 +00:00
def closeThread ( self , closeConnection = True ) :
2021-04-13 23:09:34 +00:00
with threadsLock :
2021-04-14 03:12:35 +00:00
if closeConnection :
for thread in outThreads :
thread = outThreads [ thread ]
if thread . user == self . user :
2021-04-14 16:28:54 +00:00
thread . queue . put ( ( thread . closeThread , [ ] , { } ) )
2021-04-14 03:12:35 +00:00
2021-04-14 16:28:54 +00:00
self . closeConnection ( )
2021-04-14 20:03:26 +00:00
if str ( self . threadId ) in inThreads : del inThreads [ str ( self . threadId ) ]
2021-04-13 18:33:16 +00:00
def run ( self ) :
2021-04-13 23:09:34 +00:00
try :
2021-04-14 16:28:54 +00:00
data = self . getConnection ( ) [ 0 ] . recv ( 1000 )
2021-04-14 03:12:35 +00:00
if data == b " " :
self . closeThread ( )
return
data = data . decode ( " utf-8 " )
2021-04-13 23:09:34 +00:00
while data [ - 1 ] == " " : data = data [ : - 1 ]
args = commandToList ( data )
cmd = args . pop ( 0 )
user = args [ 0 ]
self . user = user
2021-04-14 01:12:13 +00:00
password = False
if len ( args ) > 1 :
password = args [ 1 ]
userPath = p ( sp , " users " , user + " .ini " )
with fileLock :
if not os . path . isfile ( userPath ) :
self . closeThread ( )
return
config = configparser . ConfigParser ( )
config . read ( userPath )
2021-04-13 23:09:34 +00:00
if cmd == " watch " :
2021-04-14 01:12:13 +00:00
if cmd in config and " pass " in config [ cmd ] and config [ cmd ] [ " pass " ] != " " and config [ cmd ] [ " pass " ] != password :
self . closeThread ( )
return
2021-04-13 23:09:34 +00:00
with threadsLock :
2021-04-14 16:28:54 +00:00
thread = outThread ( self . threadId , self . user )
2021-04-14 03:12:35 +00:00
outThreads [ str ( self . threadId ) ] = thread
2021-04-13 23:09:34 +00:00
thread . start ( )
2021-04-14 03:12:35 +00:00
self . closeThread ( False )
2021-04-13 23:09:34 +00:00
return
if cmd == " broadcast " :
2021-04-14 01:12:13 +00:00
if cmd in config and " pass " in config [ cmd ] and config [ cmd ] [ " pass " ] != " " and config [ cmd ] [ " pass " ] != password :
self . closeThread ( )
return
2021-04-14 20:03:26 +00:00
with threadsLock :
deleteList = [ ]
for threadId in inThreads :
if threadId == str ( self . threadId ) : continue
thread = inThreads [ threadId ]
thread . closeConnection ( )
deleteList . append ( threadId )
for threadId in deleteList :
del inThreads [ threadId ]
2021-04-15 18:13:03 +00:00
if maxInboundTransferRate > 0 : startTime = time . time ( )
2021-04-13 23:09:34 +00:00
while True :
2021-04-14 16:28:54 +00:00
data = self . getConnection ( ) [ 0 ] . recv ( bufferSize )
2021-04-14 03:12:35 +00:00
if data == b " " :
self . closeThread ( )
return
2021-04-14 23:20:21 +00:00
2021-04-13 23:09:34 +00:00
with threadsLock :
for thread in outThreads :
thread = outThreads [ thread ]
if thread . user == user :
2021-04-14 04:13:58 +00:00
accumulatedData = thread . queue . qsize ( ) * bufferSize
if accumulatedData > maxAccumulatedData and not thread . ignore :
thread . ignore = True
thread . queue . put ( ( thread . closeThread , [ ] , { } ) )
2021-04-14 16:54:42 +00:00
thread . closeConnection ( )
2021-04-14 04:13:58 +00:00
else :
thread . queue . put ( data )
2021-04-14 23:20:21 +00:00
2021-04-15 18:13:03 +00:00
if maxInboundTransferRate > 0 :
endTime = time . time ( )
timeTaken = endTime - startTime
if timeTaken < = 0 : timeTaken = 0.00001 # this isn't good enough
transferSpeed = bufferSize / timeTaken
if transferSpeed > maxInboundTransferRate :
maxSleep = ( bufferSize / maxInboundTransferRate ) - timeTaken
if maxSleep > 0 :
sleepTime = ( transferSpeed / maxInboundTransferRate ) - 1
if sleepTime > maxSleep : sleepTime = maxSleep
time . sleep ( sleepTime )
startTime = endTime
2021-04-13 23:09:34 +00:00
self . closeThread ( )
2021-04-14 01:12:13 +00:00
return
2021-04-13 23:09:34 +00:00
except :
self . closeThread ( )
raise
2021-04-13 18:33:16 +00:00
2021-04-14 03:12:49 +00:00
class debugThread ( threading . Thread ) :
def __init__ ( self ) :
threading . Thread . __init__ ( self )
def run ( self ) :
while True :
with threadsLock :
2021-04-14 16:28:54 +00:00
print ( " \n --- \n " )
2021-04-14 03:12:49 +00:00
print ( " Threads - IN: " + str ( len ( inThreads ) ) )
print ( " Threads - OUT: " + str ( len ( outThreads ) ) )
2021-04-14 16:28:54 +00:00
print ( " \n ACCUMULATED DATA: " )
2021-04-14 15:03:25 +00:00
for threadId in outThreads :
thread = outThreads [ threadId ]
print ( threadId + " : " + str ( thread . queue . qsize ( ) * bufferSize ) )
2021-04-14 16:28:54 +00:00
print ( " \n CONNECTIONS: " )
connCount = 0
connCountIp = { }
with connectionsLock :
for connId in connections :
conn = connections [ connId ]
ip = conn [ 1 ] [ 0 ]
if not ip in connCountIp :
connCountIp [ ip ] = 0
connCountIp [ ip ] + = 1
connCount + = 1
for ip in connCountIp :
print ( ip + " : " + str ( connCountIp [ ip ] ) )
print ( " Overall: " + str ( connCount ) )
2021-04-14 15:03:25 +00:00
time . sleep ( 1 )
2021-04-14 03:12:49 +00:00
2021-04-15 18:54:38 +00:00
def readConfig ( ) :
config = configparser . ConfigParser ( )
config . read ( p ( os . path . splitext ( s ) [ 0 ] + " .ini " ) )
global serverAddr
serverAddrSplit = config [ " default " ] [ " serverAddr " ] . rsplit ( " : " , 1 )
serverAddrSplit [ 1 ] = int ( serverAddrSplit [ 1 ] )
serverAddr = tuple ( serverAddrSplit )
2021-04-13 18:33:16 +00:00
def main ( ) :
global threadId
2021-04-15 18:54:38 +00:00
readConfig ( )
2021-04-13 20:06:16 +00:00
serverSocket . bind ( serverAddr )
serverSocket . listen ( 1024 )
2021-04-14 03:12:49 +00:00
# DEBUG
debug = debugThread ( )
debug . start ( )
# DEBUG END
2021-04-13 18:33:16 +00:00
while True :
connection , address = serverSocket . accept ( )
2021-04-14 16:52:47 +00:00
connection . settimeout ( timeout )
2021-04-14 16:28:54 +00:00
with connectionsLock :
clientCount = 0
ipClientCount = 0
for connId in connections :
clientCount + = 1
conn = connections [ connId ]
if conn [ 1 ] [ 0 ] == address [ 0 ] :
ipClientCount + = 1
if clientCount + 1 > maxClients or ipClientCount + 1 > maxClientsPerIP :
connection . close ( )
continue
2021-04-13 18:33:16 +00:00
with threadsLock :
threadId + = 1
2021-04-14 16:28:54 +00:00
with connectionsLock :
connections [ str ( threadId ) ] = ( connection , address )
2021-04-13 18:33:16 +00:00
2021-04-14 16:28:54 +00:00
thread = inThread ( threadId )
2021-04-13 23:09:34 +00:00
inThreads [ str ( threadId ) ] = thread
2021-04-13 18:33:16 +00:00
thread . start ( )
2021-04-13 20:06:16 +00:00
if __name__ == ' __main__ ' :
main ( )