me.fier.tcpserver/module/server/_main.py

236 lines
6.1 KiB
Python

import sys
import os
import toml
import threading
import queue
import traceback
import socket
import ssl
servers = {}
lockServers = threading.Lock()
debug = True
debugLock = threading.Lock()
def debugOutput(*args,**kwargs):
try:
if not debug: return
with debugLock: print(*args,**kwargs)
except:
pass
def printException():
debugOutput(traceback.format_exc() + "\n---")
def kickClient(client):
debugOutput("Kicking client:",client.cClient.address)
try:
client.cClient.connection.close()
except Exception:
printException()
try:
del(client.cServer.cClients[client.cServer.cClients.index(client)])
except Exception:
printException()
class clientThread(threading.Thread):
def __init__(self,server,client,*args,**kwargs):
super().__init__(*args,**kwargs)
self.cServer = server
self.cClient = client
def run(self):
try:
self.cServer.cGlb._client(self.cServer,self)
except Exception:
printException()
with self.cServer.cLockClients:
kickClient(self)
if debug:
serverCount = 0
socketCount = 0
clientCount = 0
with lockServers:
serverCount = len(servers)
for serverName in servers:
with servers[serverName].cLockSockets:
socketCount = len(servers[serverName].cSockets)
with servers[serverName].cLockClients:
clientCount = len(servers[serverName].cClients)
debugOutput("* serverCount:",serverCount)
debugOutput("* socketCount:",socketCount)
debugOutput("* clientCount:",clientCount)
debugOutput("* realThreads:",threading.active_count())
def closeServer(server):
debugOutput("Closing server:",server.cServerName)
if "_shutdown" in server.cGlb:
try:
if server.cGlb._shutdown() == True:
return
except Exception:
printException()
for client in server.cClients:
try:
client.connection.close()
except Exception:
printException()
del servers[server.cServerName]
class serverThread(threading.Thread):
def __init__(self,serverName,config,glb,*args,**kwargs):
super().__init__(*args,**kwargs)
self.cConfig = config
self.cGlb = glb
self.cLockSockets = threading.Lock()
self.cSockets = []
self.cLockClients = threading.Lock()
self.cClients = []
self.cQueueClients = queue.Queue()
self.cServerName = serverName
self.cSockets = []
def run(self):
while True:
client = self.cQueueClients.get()
if client == None:
with lockServers:
with self.cLockClients:
closeServer(self)
return
with self.cLockClients:
clientThr = clientThread(self,client)
self.cClients.append(clientThr)
clientThr.start()
class socketThread(threading.Thread):
def __init__(self,server,config,socket,*args,**kwargs):
super().__init__(*args,**kwargs)
self.cServer = server
self.cSocket = socket
self.cConfig = config
def run(self):
while True:
try:
conn,addr = self.cSocket.accept()
client = mfp.Bunch()
client.connection = conn
client.address = addr
client.connection.settimeout(self.cConfig._timeout)
try:
if "_filter" in self.cServer.cGlb and self.cServer.cGlb._filter(client) == True:
try:
client.connection.close()
except Exception:
printException()
continue
if self.cConfig._ssl:
client.connection.do_handshake()
except Exception:
printException()
try:
client.connection.close()
except Exception:
printException()
continue
self.cServer.cQueueClients.put(client)
except Exception:
printException()
try:
self.cSocket.close()
except Exception:
printException()
with self.cServer.cLockSockets:
del self.cServer.cSockets[self.cServer.cSockets.index(self)]
return
def main():
dirConfig = mfp.p(mfp.sd,"config")
dirServers = mfp.p(dirConfig,"servers")
dirSockets = mfp.p(dirConfig,"sockets")
print("Starting servers ...")
for root,dirs,files in os.walk(dirServers):
for file in sorted(files):
ffile = mfp.p(root,file)
lfile = ffile.replace(dirServers + os.path.sep,"",1)
serverName = lfile.rsplit(".",1)[0]
print("> " +serverName)
glb = mfp.Bunch()
config = toml.load(ffile)
config = mfp.bunchify(config)
thr = serverThread(serverName,config,glb)
servers[serverName] = thr
mfp.dofile(config["_script"],glb)
thr.start()
break
print("\nStarting sockets ...")
for root,dirs,files in os.walk(dirSockets):
for file in sorted(dirs):
ffile = mfp.p(root,file)
lfile = ffile.replace(dirSockets + os.path.sep,"",1)
serverName = lfile
print("> " +serverName)
for root2,dirs2,files2 in os.walk(ffile):
for file2 in sorted(files2):
ffile2 = mfp.p(root2,file2)
lfile2 = ffile2.replace(ffile + os.path.sep,"",1)
socketName = lfile2.rsplit(".",1)[0]
print(">> " +socketName)
config = toml.load(ffile2)
config = mfp.bunchify(config)
serverSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
print(">>> Socket options:")
for opt in config._options:
print(">>>>",", ".join(opt))
for i in range(len(opt)):
try:
opt[i] = int(opt[i])
except Exception:
opt[i] = getattr(socket,opt[i])
#print(">>>>>","(" +(", ".join(str(i) for i in opt))+ ")")
serverSocket.setsockopt(*opt)
print(">>> Address: ---",config._addr)
print(">>> Port: ------",config._port)
serverSocket.bind((config._addr,config._port))
print(">>> SSL: -------",not (config._ssl == False))
if config._ssl:
proto = False
if sys.version_info >= (3,10):
proto = ssl.PROTOCOL_TLS_SERVER
else:
proto = ssl.PROTOCOL_TLS
ctx = ssl.SSLContext(proto)
ctx.check_hostname = False
ctx.load_cert_chain(config._ssl)
serverSocket = ctx.wrap_socket(
serverSocket,
server_side = True,
do_handshake_on_connect = False
)
serverSocket.listen(65535)
with lockServers:
for serverName in servers:
with servers[serverName].cLockSockets:
socketThr = socketThread(servers[serverName],config,serverSocket)
servers[serverName].cSockets.append(socketThr)
socketThr.start()
break
break
print("\nReady!")