fhttpy/modules/http/helpers.py

214 lines
5.0 KiB
Python

global time
import time
global urllib
import urllib.parse
global html
import html
global getHeaderFromConnection
def getHeaderFromConnection(connection):
start = time.process_time()
timeo = connection.gettimeout()
l = 0
nl = 0
header = ""
while True:
b = connection.recv(1)
if b == b"": raise ConnectionResetError
if time.process_time() - start > timeo: raise TimeoutError
l += 1
if l > maxHeaderLength:
connection.sendall("""\
HTTP 1.1 413 Payload Too Large\r
\r
""".encode("ascii"))
raise excConnectionClosed
bd = None
try:
bd = b.decode("ascii")
except:
connection.sendall("""\
HTTP 1.1 400 Bad Request\r
\r
""".encode("ASCII"))
raise excConnectionClosed
if bd == "\n":
nl += 1
if nl == 2:
return header
else:
if bd != "\r":
nl = 0
header += bd
global parseHeader
def parseHeader(headers):
headers = headers.replace("\r","").split("\n")
del headers[-1]
for i in range(len(headers)):
headers[i] = headers[i].strip(" \t")
mainHeader = headers.pop(0).split(" ")
index = 0
length = len(mainHeader)
while index < length:
val = mainHeader[index]
val = val.strip(" \t")
if val == "":
del mainHeader[index]
length -= 1
continue
index += 1
mainHeader[0] = mainHeader[0].lower()
mainHeader[-1] = mainHeader[-1].lower()
headerList = {}
for header in headers:
header = header.split(":",1)
if len(header) != 2: continue
headerKey = header[0].strip(" \t").lower()
headerValue = header[1].strip(" \t")
if headerKey in headerList:
headers[headerKey] += ", " +headerValue
else:
headerList[headerKey] = headerValue
return mainHeader,headerList
global parseHeaderPath
def parseHeaderPath(path):
path = path.split("?",1)
if len(path) < 2: path.append("")
args = {}
for arg in path[1].split("&"):
arg = arg.split("=",1)
if len(arg) < 2: arg.append("")
args[urllib.parse.unquote(arg[0]).lower()] = urllib.parse.unquote(arg[1])
return urllib.parse.unquote(path[0]),args
global fixUserPath
def fixUserPath(path):
path = path.replace("\\","/") # Replace backslash with forward slash
path = path.lstrip("/")
npath = ""
for pathbit in path.split("/"):
pathbit = pathbit.strip(" \t\r\n") # Remove spaces, tabs, line return, and new line
if pathbit in [".",".."]: # Remove . and ..
continue
npath += pathbit + "/"
npath = npath[:-1]
while "//" in npath: npath = npath.replace("//","/") # Remove double slashes
return npath
global simpleResponse
def simpleResponse(connection,status,headers = None,content = None,autolength = True):
if headers == None:
headers = {}
if not "Accept-Ranges" in headers:
headers["Accept-Ranges"] = "none"
if content != None and autolength == True:
headers["Content-Length"] = str(len(content))
if allowKeepAlive and not ("Connection" in headers):
headers["Connection"] = "keep-alive"
headers["Keep-Alive"] = "timeout=" +str(timeout)
response = 'HTTP/1.1 ' +status+ '\r\n'
for header in headers:
response += header + ": " +headers[header] + "\r\n"
response += "\r\n"
connection.sendall(response.encode("ascii"))
if content != None:
connection.sendall(content)
global refer
def refer(connection,path):
simpleResponse(
connection,"302 Found",
{
"Content-Type": "text/html; charset=ASCII",
"Location": path
},('''\
<html>
<head></head>
<body>
Referring you to <a href="''' +html.escape(path)+ '''">''' +html.escape(path)+ '''</a>...
</body>
</html>''').encode("ascii")
)
global notFound
def notFound(connection,path):
simpleResponse(
connection,"404 Not Found",
{
"Content-Type": "text/html; charset=ASCII"
},('''\
<html>
<head></head>
<body>
Not found: <a href="''' +html.escape(path)+ '''">''' +html.escape(path)+ '''</a>
</body>
</html>''').encode("ascii")
)
global pathToURL
def pathToURL(path):
path = path.split("/")
length = len(path)
index = 0
while index < length:
path[index] = urllib.parse.quote(path[index])
index += 1
path = "/".join(path)
return path
# Can return the following:
# positive integer, None: Send entire file content starting at arg 1
# negative integer, None: Send entire file content starting at file end + arg 1
# positive integer, positive integer: Send entire file, from arg 1 to arg 2, not including arg 2
global getRange
def getRange(range):
try:
range = range.split("=",1)
if range[0].strip("\t ") != "bytes": return None,None
range = range[1].split(",")[0].split("-")
range[0] = range[0].strip("\t ")
range[1] = range[1].strip("\t ")
if range[0] == "":
return 0 - int(range[1]),None
if range[1] == "":
return int(range[0]),None
return int(range[0]),int(range[1]) + 1
except:
return 0,None
global convertRanges
def convertRanges(rangeStart,rangeEnd,length):
# Convert given ranges into complete ranges
if rangeStart < 0:
rangeStart = length - rangeStart
rangeEnd = length
else:
if rangeEnd == None:
rangeEnd = length
# Check if the ranges make sense
if rangeStart < 0:
return None,None
if rangeEnd > length:
return None,None
if rangeStart > rangeEnd:
return None,None
# OK
return rangeStart,rangeEnd