Separate main and frontend code

This commit is contained in:
Fierelier 2023-05-02 06:25:55 +02:00
parent be8b12d481
commit a6b4e38cb6
3 changed files with 313 additions and 298 deletions

View File

@ -1 +1,2 @@
main = "$DISTRO.main" main = "$DISTRO.main"
frontend = "$DISTRO.frontend.terminal_simple"

View File

@ -0,0 +1,278 @@
import os
import json
import logging
import re
import readchar
import getpass
import textwrap
import threading
import platform
if platform.system().startswith("Windows"):
import win_unicode_console
win_unicode_console.enable()
from collections import OrderedDict
from colorama import Fore, Back, Style, init as colorama_init, ansi as colorama_ansi
import app
contentapi = app.loadmod("contentapi")
config = app.mod["main"].config
MAXTITLE=25
MSGPREFIX=Back.GREEN + " " + Back.RESET + " "
# The command dictionary (only used to display help)
commands = OrderedDict([
("h", "Help, prints this menu!"),
("s", "Search, find and/or set a room to listen to (one at a time!)"),
("g", "Global userlist, print users using contentapi in general"),
("u", "Userlist, print users in the current room"),
("i", "Insert mode, allows you to send a message (pauses messages!)"),
("c", "Clear; clear screen and load last N messages manually"),
("t", "Statistics, see info about runtime"),
("q", "Quit, no warning!")
])
def main(config,context):
logging.info("Testing connection to API at " + config["api"])
logging.debug(json.dumps(context.api_status(), indent = 2))
def ws_onclose(ws):
print("Websocket closed! Exiting (FYI: you were in room %d)" % ws.current_room)
exit()
# When the websocket is opened, we begin our thread to accept input forever
def ws_onopen(ws):
def main_loop():
printr(Fore.GREEN + Style.BRIGHT + os.linesep + "-- Connected to live updates! --")
if not ws.current_room:
printr(Fore.YELLOW + "* You are not connected to any room! Press 'S' to search for a room! *")
print_statusline(ws)
# The infinite input loop! Or something!
while True:
printstatus = False # Assume we are not printing the status every time (it's kinda annoying)
# Listen for a key. The implementation of 'readkey()' on this version of python and windows makes the CPU spike
# to 100%, so we switch to getch() if we detect windows.
if platform.system().startswith("Windows"):
import msvcrt
logging.debug("on windows, using msvcrt.getch")
key = msvcrt.getch().decode("utf-8")
else:
key = readchar.readkey()
# We get exclusive access to output while we're handling user input. This allows us to "pause" any output
# the threaded output queue might want to do (we're more important)
with ws.output_lock:
if key == "h":
print(" -- Help menu / Controls --")
for key, value in commands.items():
print(" " + Style.BRIGHT + key + Style.NORMAL + " - " + value)
elif key == "s":
search(ws)
printstatus = True
elif key == "g":
ws.send(ws.context.gen_ws_request("userlist", id = "userlist_global"))
elif key == "u":
if not ws.current_room:
print("You're not in a room! Can't check userlist!")
else:
# Just send it out, we have to wait for the websocket handler to get the response
ws.send(ws.context.gen_ws_request("userlist", id = "userlist_room_%d" % ws.current_room))
elif key == "i":
if not ws.current_room:
print("You're not in a room! Can't send messages!")
else:
message = input("Post (empty = exit): ")
if message:
ws.context.post_message(contentapi.comment_builder(message,
ws.current_room, ws.main_config["default_markup"], ws.user_info["avatar"]))
printstatus = ws.main_config["print_status_after_insert"]
elif key == "c":
print(colorama_ansi.clear_screen() + " -- Pulling message history: %d -- " % ws.main_config["default_history"])
print(get_message_history_string(ws))
printstatus = True
elif key == "t":
print(" -- Ignored WS Data (normal) --")
for key,value in ws.ignored.items():
printr(Style.BRIGHT + ("%16s" % key) + (" : %d" % value))
elif key == "q":
print("Quitting (may take a bit for the websocket to close)")
ws.close()
break
elif key == " ":
printstatus = True
# At the end of the loop, but still in the printing lock, print the status line (if they want)
if printstatus:
print_statusline(ws)
# Just a simple infinite loop which blocks on the queue until something is available
def ws_print_loop():
while True:
next_output = ws.output_buffer.get()
with ws.output_lock:
printr(next_output)
# Set the main room; we want to wait until the websocket is open because this also sets your
# status in the userlist
if ws.main_config["default_room"]:
set_current_room(ws, ws.main_config["default_room"])
# create a thread to run the blocking task
mainthread = threading.Thread(target=main_loop)
mainthread.start()
# create a thread to process websocket output
printthread = threading.Thread(target=ws_print_loop)
printthread.daemon = True
printthread.start()
# Message handler for our websocket; will handle live messages for the room you're listening to and
# userlist updates request results, but not much else (for now)
def ws_onmessage(ws, message):
logging.debug("WSRCV: " + message)
result = json.loads(message)
# Someone asked for the userlist, check the id to figure out what to print and which list to see
if result["type"] == "userlist":
all_statuses = result["data"]["statuses"]
if result["id"] == "userlist_global":
usermessage = " -- Global userlist --"
statuses = all_statuses["0"] if "0" in all_statuses else {}
else: # This is a bad assumption, it should parse the room id out of the id instead (maybe?)
usermessage = " -- Userlist for %s -- " % ws.current_room_data["name"]
statuses = all_statuses[str(ws.current_room)] if str(ws.current_room) in all_statuses else {}
userlist_output = usermessage
for key,value in statuses.items():
key = int(key)
user = contentapi.get_user_or_default(result["data"]["objects"]["user"], key)
userlist_output += os.linesep + Style.BRIGHT + " " + ("%s" % (user["username"] + Style.DIM + " #%d" % key)) + Style.RESET_ALL + " - " + value
ws_print(ws, userlist_output)
return
# Live updates are messages, edits, user updates, etc. Check the event list to see
elif result["type"] == "live":
# We only care about SOME live updates
for event in result["data"]["events"]:
if event["type"] == "message_event" and event["action"] == contentapi.CREATE: # I think this is a new message
objects = result["data"]["objects"]["message_event"]
message = contentapi.get_message(objects["message"], event["refId"])
if message and message["contentId"] == ws.current_room:
# OK we're DEFINITELY displaying it now
user = contentapi.get_user_or_default(objects["user"], message["createUserId"])
ws_print(ws, get_message_string(ws, message, user))
# Track ignored data
if result["type"] not in ws.ignored:
ws.ignored[result["type"]] = 0
ws.ignored[result["type"]] += 1
# Produce the string output for a given message. Can be printed directly to console
def get_message_string(ws, message, user):
result = (MSGPREFIX + Fore.CYAN + Style.BRIGHT + user["username"] + " " + Style.DIM + "#%d" % user["id"] +
Fore.MAGENTA + " " + message["createDate"] + " [%d]" % message["id"] + os.linesep + Style.RESET_ALL)
for t in textwrap.wrap(message["text"], width = ws.main_config["fixed_width"]):
result += (MSGPREFIX + t + os.linesep)
return result.rstrip(os.linesep)
# Produce a large string of output for all history in the current room. Can be printed directly to console
def get_message_history_string(ws):
if ws.current_room:
result = ws.context.basic_message_history(ws.current_room, ws.main_config["default_history"], "~engagement")
users = result["objects"]["user"]
message_block = ""
for message in reversed(result["objects"]["message"]):
user = contentapi.get_user_or_default(users, message["createUserId"])
message_block += get_message_string(ws, message, user) + os.linesep
return message_block.rstrip(os.linesep)
return None
# Printing websocket event output is tricky because we don't want to interrupt user input (we don't have
# curses). As such, we must buffer our output IF we are asked to pause
def ws_print(ws, output):
# Queueing is supposed to be threadsafe, so just slap a new one in there. This will wake up
# the printing thread automatically
ws.output_buffer.put(output)
# Set the room to listen to on the websocket. Will also update the userlist, if
# it's appropriate to do so
def set_current_room(ws, roomid):
try:
ws.current_room_data = ws.context.get_by_id("content", roomid)
ws.current_room = roomid
# Generate the new user status list. Must always send the full list every time.
statuses = { "%d" % roomid : "active" }
if ws.main_config["appear_in_global"]:
statuses["0"] = "active"
ws.send(ws.context.gen_ws_request("setuserstatus", data = statuses))
print(Fore.GREEN + "Set room to %s" % ws.current_room_data["name"] + Style.RESET_ALL)
return
except Exception as ex:
print(Fore.RED + "Couldn't find room with id %d" % roomid + Style.RESET_ALL)
# Enter a search loop which will repeat until you quit. Output should be PAUSED here
# (but someone else does it for us, we don't even know what 'pausing' is)
def search(ws):
while True:
searchterm = input("Search text (#ROOMNUM = set room, # to quit): ")
if searchterm == "#":
return
match = re.match(r'#(\d+)', searchterm)
if match:
roomid = int(match.group(1))
set_current_room(ws, roomid)
return
elif searchterm:
# Go search for rooms and display them
result = ws.context.basic_search(searchterm)["objects"]["content"]
if len(result):
for content in result:
printr(Style.BRIGHT + "%7s" % ("#%d" % content["id"]) + Style.RESET_ALL + " - %s" % content["name"])
else:
printr(Style.DIM + " -- No results -- ")
# Either pull the token from a file, or get the login from the command
# line if that doesn't work. WILL test your token against the real API
# even if it's pulled from file!
def authenticate(context: contentapi.ApiContext,message):
while True:
print(message)
username = input("Username: ")
password = getpass.getpass("Password: ")
try:
token = context.login(username, password, config["expire_seconds"])
with open(config["tokenfile"], 'w') as f:
f.write(token)
logging.info("Token accepted, written to " + config["tokenfile"])
context.token = token
return
except Exception as ex:
print("ERROR: %s" % ex)
message = "Please try logging in again:"
def print_statusline(ws):
# if ws_context.connected: bg = Back.GREEN else: bg = Back.RED
if ws.current_room:
name = ws.current_room_data["name"]
room = "'" + (name[:(MAXTITLE - 3)] + '...' if len(name) > MAXTITLE else name) + "'"
else:
room = Fore.RED + Style.DIM + "NONE" + Style.NORMAL + Fore.BLACK
print(Back.GREEN + Fore.BLACK + os.linesep + " " + ws.user_info["username"] + " - " + room + " CTRL: h s g u i c t q " + Style.RESET_ALL)
# Print and then reset the style
def printr(msg):
print(msg + Style.RESET_ALL)

View File

@ -1,30 +1,22 @@
import os import os
import json import json
import logging import logging
import getpass
import textwrap
import threading import threading
import platform import toml
import queue
import re
import toml
import readchar
import websocket import websocket
import win_unicode_console import queue
from collections import OrderedDict
from colorama import Fore, Back, Style, init as colorama_init, ansi as colorama_ansi from colorama import Fore, Back, Style, init as colorama_init, ansi as colorama_ansi
import app import app
contentapi = app.loadmod("contentapi") contentapi = app.loadmod("contentapi")
utils = app.loadmod("utils") utils = app.loadmod("utils")
def appInit(): # This is ran after all modules are loaded
global frontend
frontend = app.mod["frontend"]
VERSION=(0,2,0) # Arbitrary but whatever VERSION=(0,2,0) # Arbitrary but whatever
VERSION="0.2.0" # Arbitrary but whatever
CONFIGFILE="config.toml" CONFIGFILE="config.toml"
MAXTITLE=25
MSGPREFIX=Back.GREEN + " " + Back.RESET + " "
# The entire config object with all defaults # The entire config object with all defaults
config = { config = {
@ -42,37 +34,39 @@ config = {
"tokenfile" : ".qcstoken" "tokenfile" : ".qcstoken"
} }
# The command dictionary (only used to display help) # Loads the config from file into the global config var. If the file
commands = OrderedDict([ # doesn't exist, the file is created from the defaults in config.
("h", "Help, prints this menu!"), # The function returns nothing
("s", "Search, find and/or set a room to listen to (one at a time!)"), def load_or_create_global_config():
("g", "Global userlist, print users using contentapi in general"), global config
("u", "Userlist, print users in the current room"), # Check if the config file exists
("i", "Insert mode, allows you to send a message (pauses messages!)"), if os.path.isfile(CONFIGFILE):
("c", "Clear; clear screen and load last N messages manually"), # Read and deserialize the config file
("t", "Statistics, see info about runtime"), with open(CONFIGFILE, 'r', encoding='utf-8') as f:
("q", "Quit, no warning!") temp_config = toml.load(f)
]) utils.merge_dictionary(temp_config, config)
else:
# Serialize and write the config dictionary to the config file
logging.warn("No config found at " + CONFIGFILE + ", creating now")
with open(CONFIGFILE, 'w', encoding='utf-8') as f:
toml.dump(config, f)
utils.set_logging_level(config["default_loglevel"])
def main(): def main():
print("Program start: %s" % ".".join(map(str,VERSION))) print("Program start: %s" % ".".join(map(str,VERSION)))
win_unicode_console.enable()
colorama_init() # colorama init colorama_init() # colorama init
load_or_create_global_config() load_or_create_global_config()
logging.info("Config: " + json.dumps(config, indent = 2)) logging.info("Config: " + json.dumps(config, indent = 2))
context = contentapi.ApiContext(config["api"], logging)
logging.info("Testing connection to API at " + config["api"])
logging.debug(json.dumps(context.api_status(), indent = 2))
authenticate(config, context)
# Let users debug the websocket if they want I guess # Let users debug the websocket if they want I guess
if config["websocket_trace"]: if config["websocket_trace"]:
websocket.enableTrace(True) websocket.enableTrace(True)
context = contentapi.ApiContext(config["api"], logging)
frontend.main(config,context)
authenticate(config, context)
ws = websocket.WebSocketApp(context.websocket_endpoint()) ws = websocket.WebSocketApp(context.websocket_endpoint())
# Might as well reuse the websocket object for my websocket context data (oops, is that bad?) # Might as well reuse the websocket object for my websocket context data (oops, is that bad?)
@ -95,242 +89,13 @@ def main():
ws.current_room_data = False ws.current_room_data = False
# set the callback functions # set the callback functions
ws.on_open = ws_onopen ws.on_open = app.mod["frontend"].ws_onopen
ws.on_close = ws_onclose ws.on_close = app.mod["frontend"].ws_onclose
ws.on_message = ws_onmessage ws.on_message = app.mod["frontend"].ws_onmessage
ws.run_forever() ws.run_forever()
print("Program end")
def ws_onclose(ws):
print("Websocket closed! Exiting (FYI: you were in room %d)" % ws.current_room)
exit()
# When the websocket is opened, we begin our thread to accept input forever
def ws_onopen(ws):
def main_loop():
printr(Fore.GREEN + Style.BRIGHT + os.linesep + "-- Connected to live updates! --")
if not ws.current_room:
printr(Fore.YELLOW + "* You are not connected to any room! Press 'S' to search for a room! *")
print_statusline(ws)
# The infinite input loop! Or something!
while True:
printstatus = False # Assume we are not printing the status every time (it's kinda annoying)
# Listen for a key. The implementation of 'readkey()' on this version of python and windows makes the CPU spike
# to 100%, so we switch to getch() if we detect windows.
if platform.system().startswith("Windows"):
import msvcrt
logging.debug("on windows, using msvcrt.getch")
key = msvcrt.getch().decode("utf-8")
else:
key = readchar.readkey()
# We get exclusive access to output while we're handling user input. This allows us to "pause" any output
# the threaded output queue might want to do (we're more important)
with ws.output_lock:
if key == "h":
print(" -- Help menu / Controls --")
for key, value in commands.items():
print(" " + Style.BRIGHT + key + Style.NORMAL + " - " + value)
elif key == "s":
search(ws)
printstatus = True
elif key == "g":
ws.send(ws.context.gen_ws_request("userlist", id = "userlist_global"))
elif key == "u":
if not ws.current_room:
print("You're not in a room! Can't check userlist!")
else:
# Just send it out, we have to wait for the websocket handler to get the response
ws.send(ws.context.gen_ws_request("userlist", id = "userlist_room_%d" % ws.current_room))
elif key == "i":
if not ws.current_room:
print("You're not in a room! Can't send messages!")
else:
message = input("Post (empty = exit): ")
if message:
ws.context.post_message(contentapi.comment_builder(message,
ws.current_room, ws.main_config["default_markup"], ws.user_info["avatar"]))
printstatus = ws.main_config["print_status_after_insert"]
elif key == "c":
print(colorama_ansi.clear_screen() + " -- Pulling message history: %d -- " % ws.main_config["default_history"])
print(get_message_history_string(ws))
printstatus = True
elif key == "t":
print(" -- Ignored WS Data (normal) --")
for key,value in ws.ignored.items():
printr(Style.BRIGHT + ("%16s" % key) + (" : %d" % value))
elif key == "q":
print("Quitting (may take a bit for the websocket to close)")
ws.close()
break
elif key == " ":
printstatus = True
# At the end of the loop, but still in the printing lock, print the status line (if they want)
if printstatus:
print_statusline(ws)
# Just a simple infinite loop which blocks on the queue until something is available print("Program end")
def ws_print_loop():
while True:
next_output = ws.output_buffer.get()
with ws.output_lock:
printr(next_output)
# Set the main room; we want to wait until the websocket is open because this also sets your
# status in the userlist
if ws.main_config["default_room"]:
set_current_room(ws, ws.main_config["default_room"])
# create a thread to run the blocking task
mainthread = threading.Thread(target=main_loop)
mainthread.start()
# create a thread to process websocket output
printthread = threading.Thread(target=ws_print_loop)
printthread.daemon = True
printthread.start()
# Message handler for our websocket; will handle live messages for the room you're listening to and
# userlist updates request results, but not much else (for now)
def ws_onmessage(ws, message):
logging.debug("WSRCV: " + message)
result = json.loads(message)
# Someone asked for the userlist, check the id to figure out what to print and which list to see
if result["type"] == "userlist":
all_statuses = result["data"]["statuses"]
if result["id"] == "userlist_global":
usermessage = " -- Global userlist --"
statuses = all_statuses["0"] if "0" in all_statuses else {}
else: # This is a bad assumption, it should parse the room id out of the id instead (maybe?)
usermessage = " -- Userlist for %s -- " % ws.current_room_data["name"]
statuses = all_statuses[str(ws.current_room)] if str(ws.current_room) in all_statuses else {}
userlist_output = usermessage
for key,value in statuses.items():
key = int(key)
user = contentapi.get_user_or_default(result["data"]["objects"]["user"], key)
userlist_output += os.linesep + Style.BRIGHT + " " + ("%s" % (user["username"] + Style.DIM + " #%d" % key)) + Style.RESET_ALL + " - " + value
ws_print(ws, userlist_output)
return
# Live updates are messages, edits, user updates, etc. Check the event list to see
elif result["type"] == "live":
# We only care about SOME live updates
for event in result["data"]["events"]:
if event["type"] == "message_event" and event["action"] == contentapi.CREATE: # I think this is a new message
objects = result["data"]["objects"]["message_event"]
message = contentapi.get_message(objects["message"], event["refId"])
if message and message["contentId"] == ws.current_room:
# OK we're DEFINITELY displaying it now
user = contentapi.get_user_or_default(objects["user"], message["createUserId"])
ws_print(ws, get_message_string(ws, message, user))
# Track ignored data
if result["type"] not in ws.ignored:
ws.ignored[result["type"]] = 0
ws.ignored[result["type"]] += 1
# Produce the string output for a given message. Can be printed directly to console
def get_message_string(ws, message, user):
result = (MSGPREFIX + Fore.CYAN + Style.BRIGHT + user["username"] + " " + Style.DIM + "#%d" % user["id"] +
Fore.MAGENTA + " " + message["createDate"] + " [%d]" % message["id"] + os.linesep + Style.RESET_ALL)
for t in textwrap.wrap(message["text"], width = ws.main_config["fixed_width"]):
result += (MSGPREFIX + t + os.linesep)
return result.rstrip(os.linesep)
# Produce a large string of output for all history in the current room. Can be printed directly to console
def get_message_history_string(ws):
if ws.current_room:
result = ws.context.basic_message_history(ws.current_room, ws.main_config["default_history"], "~engagement")
users = result["objects"]["user"]
message_block = ""
for message in reversed(result["objects"]["message"]):
user = contentapi.get_user_or_default(users, message["createUserId"])
message_block += get_message_string(ws, message, user) + os.linesep
return message_block.rstrip(os.linesep)
return None
# Printing websocket event output is tricky because we don't want to interrupt user input (we don't have
# curses). As such, we must buffer our output IF we are asked to pause
def ws_print(ws, output):
# Queueing is supposed to be threadsafe, so just slap a new one in there. This will wake up
# the printing thread automatically
ws.output_buffer.put(output)
# Loads the config from file into the global config var. If the file
# doesn't exist, the file is created from the defaults in config.
# The function returns nothing
def load_or_create_global_config():
global config
# Check if the config file exists
if os.path.isfile(CONFIGFILE):
# Read and deserialize the config file
with open(CONFIGFILE, 'r', encoding='utf-8') as f:
temp_config = toml.load(f)
utils.merge_dictionary(temp_config, config)
else:
# Serialize and write the config dictionary to the config file
logging.warn("No config found at " + CONFIGFILE + ", creating now")
with open(CONFIGFILE, 'w', encoding='utf-8') as f:
toml.dump(config, f)
utils.set_logging_level(config["default_loglevel"])
# Set the room to listen to on the websocket. Will also update the userlist, if
# it's appropriate to do so
def set_current_room(ws, roomid):
try:
ws.current_room_data = ws.context.get_by_id("content", roomid)
ws.current_room = roomid
# Generate the new user status list. Must always send the full list every time.
statuses = { "%d" % roomid : "active" }
if ws.main_config["appear_in_global"]:
statuses["0"] = "active"
ws.send(ws.context.gen_ws_request("setuserstatus", data = statuses))
print(Fore.GREEN + "Set room to %s" % ws.current_room_data["name"] + Style.RESET_ALL)
return
except Exception as ex:
print(Fore.RED + "Couldn't find room with id %d" % roomid + Style.RESET_ALL)
# Enter a search loop which will repeat until you quit. Output should be PAUSED here
# (but someone else does it for us, we don't even know what 'pausing' is)
def search(ws):
while True:
searchterm = input("Search text (#ROOMNUM = set room, # to quit): ")
if searchterm == "#":
return
match = re.match(r'#(\d+)', searchterm)
if match:
roomid = int(match.group(1))
set_current_room(ws, roomid)
return
elif searchterm:
# Go search for rooms and display them
result = ws.context.basic_search(searchterm)["objects"]["content"]
if len(result):
for content in result:
printr(Style.BRIGHT + "%7s" % ("#%d" % content["id"]) + Style.RESET_ALL + " - %s" % content["name"])
else:
printr(Style.DIM + " -- No results -- ")
# Either pull the token from a file, or get the login from the command # Either pull the token from a file, or get the login from the command
# line if that doesn't work. WILL test your token against the real API # line if that doesn't work. WILL test your token against the real API
@ -349,33 +114,4 @@ def authenticate(config, context: contentapi.ApiContext):
message = "Token file expired" message = "Token file expired"
message += ", Please enter login for " + config["api"] message += ", Please enter login for " + config["api"]
frontend.authenticate(context,message)
while True:
print(message)
username = input("Username: ")
password = getpass.getpass("Password: ")
try:
token = context.login(username, password, config["expire_seconds"])
with open(config["tokenfile"], 'w') as f:
f.write(token)
logging.info("Token accepted, written to " + config["tokenfile"])
context.token = token
return
except Exception as ex:
print("ERROR: %s" % ex)
message = "Please try logging in again:"
def print_statusline(ws):
# if ws_context.connected: bg = Back.GREEN else: bg = Back.RED
if ws.current_room:
name = ws.current_room_data["name"]
room = "'" + (name[:(MAXTITLE - 3)] + '...' if len(name) > MAXTITLE else name) + "'"
else:
room = Fore.RED + Style.DIM + "NONE" + Style.NORMAL + Fore.BLACK
print(Back.GREEN + Fore.BLACK + os.linesep + " " + ws.user_info["username"] + " - " + room + " CTRL: h s g u i c t q " + Style.RESET_ALL)
# Print and then reset the style
def printr(msg):
print(msg + Style.RESET_ALL)