qcs-python/main.py

390 lines
16 KiB
Python

import os
import json
import logging
import getpass
import textwrap
import threading
import platform
import queue
import re
import toml
import readchar
import websocket
import win_unicode_console
from collections import OrderedDict
from colorama import Fore, Back, Style, init as colorama_init, ansi as colorama_ansi
import contentapi
import myutils
VERSION="0.2.0" # Arbitrary but whatever
CONFIGFILE="config.toml"
MAXTITLE=25
MSGPREFIX=Back.GREEN + " " + Back.RESET + " "
# The entire config object with all defaults
config = {
"api" : "http://localhost:5000/api",
"default_loglevel" : "WARNING",
"websocket_trace" : False,
"default_room" : 0, # Zero means it will ask you for a room
"default_markup" : "plaintext",
"expire_seconds" : 31536000, # 365 days in seconds, expiration for token
"appear_in_global" : False,
"print_status_after_insert" : True,
"output_buffer_timeout" : 0.05, # 50 milliseconds
"default_history" : 100,
"fixed_width" : 78, # Need room for the pre-message decoration
"tokenfile" : ".qcstoken"
}
# The command dictionary (only used to display help)
commands = OrderedDict([
("h", "Help, prints this menu!"),
("s", "Search, find and/or set a room to listen to (one at a time!)"),
("g", "Global userlist, print users using contentapi in general"),
("u", "Userlist, print users in the current room"),
("i", "Insert mode, allows you to send a message (pauses messages!)"),
("c", "Clear; clear screen and load last N messages manually"),
("t", "Statistics, see info about runtime"),
("q", "Quit, no warning!")
])
def main():
print("Program start: %s" % VERSION)
win_unicode_console.enable()
colorama_init() # colorama init
load_or_create_global_config()
logging.info("Config: " + json.dumps(config, indent = 2))
context = contentapi.ApiContext(config["api"], logging)
logging.info("Testing connection to API at " + config["api"])
logging.debug(json.dumps(context.api_status(), indent = 2))
authenticate(config, context)
# Let users debug the websocket if they want I guess
if config["websocket_trace"]:
websocket.enableTrace(True)
ws = websocket.WebSocketApp(context.websocket_endpoint())
# Might as well reuse the websocket object for my websocket context data (oops, is that bad?)
ws.context = context
ws.user_info = context.user_me()
# Output from the websocket can only get truly printed to the screen when this lock is released.
# We grab the lock when the user is in some kind of "input" mode, which blocks the websocket printing
# thread from processing the queue (if it has anything anyway)
ws.output_lock = threading.Lock()
# Output from the websocket is ALWAYS buffered, and we use a thread-safe queue to add and remove
# output safely. We buffer all messages to ensure the order is preserved; if we SOMETIMES queued and
# SOMETIMES did not, we would need to be very careful about whether the queue was empty, which requires
# additional locking and etc.
ws.output_buffer = queue.Queue()
ws.main_config = config
ws.current_room = 0
ws.current_room_data = False
ws.ignored = {}
# Note that the current_room stuff is set on open if you've supplied a default room in config
# set the callback functions
ws.on_open = ws_onopen
ws.on_close = ws_onclose
ws.on_message = ws_onmessage
ws.run_forever()
print("Program end")
def ws_onclose(ws):
print("Websocket closed! Exiting (FYI: you were in room %d)" % ws.current_room)
exit()
# When the websocket is opened, we begin our thread to accept input forever
def ws_onopen(ws):
def main_loop():
printr(Fore.GREEN + Style.BRIGHT + "\n-- Connected to live updates! --")
if not ws.current_room:
printr(Fore.YELLOW + "* You are not connected to any room! Press 'S' to search for a room! *")
print_statusline(ws)
# The infinite input loop! Or something!
while True:
# # Dump the buffer, if there is any, since we're outside of user input
# with ws.output_lock:
# for output in ws.output_buffer:
# printr(output)
# ws.output_buffer = []
printstatus = False # Assume we are not printing the status every time (it's kinda annoying)
# Listen for a key. The implementation of 'readkey()' on this version of python and windows makes the CPU spike
# to 100%, so we switch to getch() if we detect windows.
if platform.system().startswith("Windows"):
import msvcrt
logging.debug("on windows, using msvcrt.getch")
key = msvcrt.getch().decode("utf-8")
else:
key = readchar.readkey()
# We get exclusive access to output while we're handling user input. This allows us to "pause/buffer" the
# output that might come from websocket events
with ws.output_lock:
if key == "h":
print(" -- Help menu / Controls --")
for key, value in commands.items():
print(" " + Style.BRIGHT + key + Style.NORMAL + " - " + value)
elif key == "s":
search(ws)
printstatus = True
elif key == "g":
ws.send(ws.context.gen_ws_request("userlist", id = "userlist_global"))
elif key == "u":
if not ws.current_room:
print("You're not in a room! Can't check userlist!")
else:
# Just send it out, we have to wait for the websocket handler to get the response
ws.send(ws.context.gen_ws_request("userlist", id = "userlist_room_%d" % ws.current_room))
elif key == "i":
if not ws.current_room:
print("You're not in a room! Can't send messages!")
else:
message = input("Post (empty = exit): ")
if message:
ws.context.post_message(contentapi.comment_builder(message,
ws.current_room, ws.main_config["default_markup"], ws.user_info["avatar"]))
printstatus = ws.main_config["print_status_after_insert"]
elif key == "c":
print(colorama_ansi.clear_screen() + " -- Pulling message history: %d -- " % ws.main_config["default_history"])
print(get_message_history_string(ws))
printstatus = True
elif key == "t":
print(" -- Ignored WS Data (normal) --")
for key,value in ws.ignored.items():
printr(Style.BRIGHT + ("%16s" % key) + (" : %d" % value))
elif key == "q":
print("Quitting (may take a bit for the websocket to close)")
ws.close()
break
elif key == " ":
printstatus = True
# At the end of the loop, but still in the printing lock, print the status line
if printstatus:
print_statusline(ws)
# Just a simple infinite loop which blocks on the queue until something is available
def ws_print_loop():
while True:
next_output = ws.output_buffer.get()
with ws.output_lock:
printr(next_output)
# Set the main room; we want to wait until the websocket is open because this also sets your
# status in the userlist
if ws.main_config["default_room"]:
set_current_room(ws, ws.main_config["default_room"])
# create a thread to run the blocking task
mainthread = threading.Thread(target=main_loop)
mainthread.start()
# create a thread to process websocket output
printthread = threading.Thread(target=ws_print_loop)
printthread.daemon = True
printthread.start()
# Message handler for our websocket; will handle live messages for the room you're listening to and
# userlist updates request results, but not much else (for now)
def ws_onmessage(ws, message):
logging.debug("WSRCV: " + message)
result = json.loads(message)
# Someone asked for the userlist, check the id to figure out what to print and which list to see
if result["type"] == "userlist":
all_statuses = result["data"]["statuses"]
if result["id"] == "userlist_global":
usermessage = " -- Global userlist --"
statuses = all_statuses["0"] if "0" in all_statuses else {}
else: # This is a bad assumption, it should parse the room id out of the id instead (maybe?)
usermessage = " -- Userlist for %s -- " % ws.current_room_data["name"]
statuses = all_statuses[str(ws.current_room)] if str(ws.current_room) in all_statuses else {}
userlist_output = usermessage
for key,value in statuses.items():
key = int(key)
user = contentapi.get_user_or_default(result["data"]["objects"]["user"], key)
userlist_output += "\n" + Style.BRIGHT + " " + ("%s" % (user["username"] + Style.DIM + " #%d" % key)) + Style.RESET_ALL + " - " + value
ws_print(ws, userlist_output)
return
# Live updates are messages, edits, user updates, etc. Check the event list to see
elif result["type"] == "live":
# We only care about SOME live updates
for event in result["data"]["events"]:
if event["type"] == "message_event" and event["action"] == contentapi.CREATE: # I think this is a new message
objects = result["data"]["objects"]["message_event"]
message = contentapi.get_message(objects["message"], event["refId"])
if message and message["contentId"] == ws.current_room:
# OK we're DEFINITELY displaying it now
user = contentapi.get_user_or_default(objects["user"], message["createUserId"])
ws_print(ws, get_message_string(ws, message, user))
# Track ignored data
if result["type"] not in ws.ignored:
ws.ignored[result["type"]] = 0
ws.ignored[result["type"]] += 1
# Produce the string output for a given message. Can be printed directly to console
def get_message_string(ws, message, user):
result = (MSGPREFIX + Fore.CYAN + Style.BRIGHT + user["username"] + " " + Style.DIM + "#%d" % user["id"] +
Fore.MAGENTA + " " + message["createDate"] + " [%d]" % message["id"] + "\n" + Style.RESET_ALL)
for t in textwrap.wrap(message["text"], width = ws.main_config["fixed_width"]):
result += (MSGPREFIX + t + "\n")
return result.rstrip("\n")
# Produce a large string of output for all history in the current room. Can be printed directly to console
def get_message_history_string(ws):
if ws.current_room:
result = ws.context.basic_message_history(ws.current_room, ws.main_config["default_history"], "~engagement")
users = result["objects"]["user"]
message_block = ""
for message in reversed(result["objects"]["message"]):
user = contentapi.get_user_or_default(users, message["createUserId"])
message_block += get_message_string(ws, message, user) + "\n"
return message_block.rstrip("\n")
return None
# Printing websocket event output is tricky because we don't want to interrupt user input (we don't have
# curses). As such, we must buffer our output IF we are asked to pause
def ws_print(ws, output):
# Queueing is supposed to be threadsafe, so just slap a new one in there. This will wake up
# the printing thread automatically
ws.output_buffer.put(output)
# Loads the config from file into the global config var. If the file
# doesn't exist, the file is created from the defaults in config.
# The function returns nothing
def load_or_create_global_config():
global config
# Check if the config file exists
if os.path.isfile(CONFIGFILE):
# Read and deserialize the config file
with open(CONFIGFILE, 'r', encoding='utf-8') as f:
temp_config = toml.load(f)
myutils.merge_dictionary(temp_config, config)
else:
# Serialize and write the config dictionary to the config file
logging.warn("No config found at " + CONFIGFILE + ", creating now")
with open(CONFIGFILE, 'w', encoding='utf-8') as f:
toml.dump(config, f)
myutils.set_logging_level(config["default_loglevel"])
# Set the room to listen to on the websocket. Will also update the userlist, if
# it's appropriate to do so
def set_current_room(ws, roomid):
try:
ws.current_room_data = ws.context.get_by_id("content", roomid)
ws.current_room = roomid
# Generate the new user status list. Must always send the full list every time.
statuses = { "%d" % roomid : "active" }
if ws.main_config["appear_in_global"]:
statuses["0"] = "active"
ws.send(ws.context.gen_ws_request("setuserstatus", data = statuses))
print(Fore.GREEN + "Set room to %s" % ws.current_room_data["name"] + Style.RESET_ALL)
return
except Exception as ex:
print(Fore.RED + "Couldn't find room with id %d" % roomid + Style.RESET_ALL)
# Enter a search loop which will repeat until you quit. Output should be PAUSED here
# (but someone else does it for us, we don't even know what 'pausing' is)
def search(ws):
while True:
searchterm = input("Search text (#ROOMNUM = set room, # to quit): ")
if searchterm == "#":
return
match = re.match(r'#(\d+)', searchterm)
if match:
roomid = int(match.group(1))
set_current_room(ws, roomid)
return
elif searchterm:
# Go search for rooms and display them
result = ws.context.basic_search(searchterm)["objects"]["content"]
if len(result):
for content in result:
printr(Style.BRIGHT + "%7s" % ("#%d" % content["id"]) + Style.RESET_ALL + " - %s" % content["name"])
else:
printr(Style.DIM + " -- No results -- ")
# Either pull the token from a file, or get the login from the command
# line if that doesn't work. WILL test your token against the real API
# even if it's pulled from file!
def authenticate(config, context: contentapi.ApiContext):
message = "No token file found"
if os.path.isfile(config["tokenfile"]):
with open(config["tokenfile"], 'r') as f:
token = f.read()
logging.debug("Token from file: " + token)
context.token = token
if context.is_token_valid():
logging.info("Logged in using token file " + config["tokenfile"])
return
else:
message = "Token file expired"
message += ", Please enter login for " + config["api"]
while True:
print(message)
username = input("Username: ")
password = getpass.getpass("Password: ")
try:
token = context.login(username, password, config["expire_seconds"])
with open(config["tokenfile"], 'w') as f:
f.write(token)
logging.info("Token accepted, written to " + config["tokenfile"])
context.token = token
return
except Exception as ex:
print("ERROR: %s" % ex)
message = "Please try logging in again:"
def print_statusline(ws):
# if ws_context.connected: bg = Back.GREEN else: bg = Back.RED
if ws.current_room:
name = ws.current_room_data["name"]
room = "'" + (name[:(MAXTITLE - 3)] + '...' if len(name) > MAXTITLE else name) + "'"
else:
room = Fore.RED + Style.DIM + "NONE" + Style.NORMAL + Fore.BLACK
print(Back.GREEN + Fore.BLACK + "\n " + ws.user_info["username"] + " - " + room + " CTRL: h s g u i c t q " + Style.RESET_ALL)
# Print and then reset the style
def printr(msg):
print(msg + Style.RESET_ALL)
# Because python reasons
if __name__ == "__main__":
main()