import os import json import logging import getpass import textwrap import threading import platform import queue import re import toml import readchar import websocket import win_unicode_console from collections import OrderedDict from colorama import Fore, Back, Style, init as colorama_init, ansi as colorama_ansi import contentapi import myutils VERSION="0.2.0" # Arbitrary but whatever CONFIGFILE="config.toml" MAXTITLE=25 MSGPREFIX=Back.GREEN + " " + Back.RESET + " " # The entire config object with all defaults config = { "api" : "http://localhost:5000/api", "default_loglevel" : "WARNING", "websocket_trace" : False, "default_room" : 0, # Zero means it will ask you for a room "default_markup" : "plaintext", "expire_seconds" : 31536000, # 365 days in seconds, expiration for token "appear_in_global" : False, "print_status_after_insert" : True, "output_buffer_timeout" : 0.05, # 50 milliseconds "default_history" : 100, "fixed_width" : 78, # Need room for the pre-message decoration "tokenfile" : ".qcstoken" } # The command dictionary (only used to display help) commands = OrderedDict([ ("h", "Help, prints this menu!"), ("s", "Search, find and/or set a room to listen to (one at a time!)"), ("g", "Global userlist, print users using contentapi in general"), ("u", "Userlist, print users in the current room"), ("i", "Insert mode, allows you to send a message (pauses messages!)"), ("c", "Clear; clear screen and load last N messages manually"), ("t", "Statistics, see info about runtime"), ("q", "Quit, no warning!") ]) def main(): print("Program start: %s" % VERSION) win_unicode_console.enable() colorama_init() # colorama init load_or_create_global_config() logging.info("Config: " + json.dumps(config, indent = 2)) context = contentapi.ApiContext(config["api"], logging) logging.info("Testing connection to API at " + config["api"]) logging.debug(json.dumps(context.api_status(), indent = 2)) authenticate(config, context) # Let users debug the websocket if they want I guess if config["websocket_trace"]: websocket.enableTrace(True) ws = websocket.WebSocketApp(context.websocket_endpoint()) # Might as well reuse the websocket object for my websocket context data (oops, is that bad?) ws.context = context ws.user_info = context.user_me() ws.main_config = config ws.ignored = {} # Just a tracking list for fun stats # Output from the websocket can only get truly printed to the screen when this lock is released. # We grab the lock when the user is in some kind of "input" mode, which blocks the websocket printing # thread from processing the queue (if it has anything anyway) ws.output_lock = threading.Lock() # Output from the websocket is ALWAYS buffered, and we use a thread-safe queue to add and remove # output safely. We buffer all messages to ensure the order is preserved; if we SOMETIMES queued and # SOMETIMES did not, we would need to be very careful about whether the queue was empty, which requires # additional locking and etc. ws.output_buffer = queue.Queue() # Note that the current_room stuff is set on open if you've supplied a default room in config ws.current_room = 0 ws.current_room_data = False # set the callback functions ws.on_open = ws_onopen ws.on_close = ws_onclose ws.on_message = ws_onmessage ws.run_forever() print("Program end") def ws_onclose(ws): print("Websocket closed! Exiting (FYI: you were in room %d)" % ws.current_room) exit() # When the websocket is opened, we begin our thread to accept input forever def ws_onopen(ws): def main_loop(): printr(Fore.GREEN + Style.BRIGHT + "\n-- Connected to live updates! --") if not ws.current_room: printr(Fore.YELLOW + "* You are not connected to any room! Press 'S' to search for a room! *") print_statusline(ws) # The infinite input loop! Or something! while True: printstatus = False # Assume we are not printing the status every time (it's kinda annoying) # Listen for a key. The implementation of 'readkey()' on this version of python and windows makes the CPU spike # to 100%, so we switch to getch() if we detect windows. if platform.system().startswith("Windows"): import msvcrt logging.debug("on windows, using msvcrt.getch") key = msvcrt.getch().decode("utf-8") else: key = readchar.readkey() # We get exclusive access to output while we're handling user input. This allows us to "pause" any output # the threaded output queue might want to do (we're more important) with ws.output_lock: if key == "h": print(" -- Help menu / Controls --") for key, value in commands.items(): print(" " + Style.BRIGHT + key + Style.NORMAL + " - " + value) elif key == "s": search(ws) printstatus = True elif key == "g": ws.send(ws.context.gen_ws_request("userlist", id = "userlist_global")) elif key == "u": if not ws.current_room: print("You're not in a room! Can't check userlist!") else: # Just send it out, we have to wait for the websocket handler to get the response ws.send(ws.context.gen_ws_request("userlist", id = "userlist_room_%d" % ws.current_room)) elif key == "i": if not ws.current_room: print("You're not in a room! Can't send messages!") else: message = input("Post (empty = exit): ") if message: ws.context.post_message(contentapi.comment_builder(message, ws.current_room, ws.main_config["default_markup"], ws.user_info["avatar"])) printstatus = ws.main_config["print_status_after_insert"] elif key == "c": print(colorama_ansi.clear_screen() + " -- Pulling message history: %d -- " % ws.main_config["default_history"]) print(get_message_history_string(ws)) printstatus = True elif key == "t": print(" -- Ignored WS Data (normal) --") for key,value in ws.ignored.items(): printr(Style.BRIGHT + ("%16s" % key) + (" : %d" % value)) elif key == "q": print("Quitting (may take a bit for the websocket to close)") ws.close() break elif key == " ": printstatus = True # At the end of the loop, but still in the printing lock, print the status line (if they want) if printstatus: print_statusline(ws) # Just a simple infinite loop which blocks on the queue until something is available def ws_print_loop(): while True: next_output = ws.output_buffer.get() with ws.output_lock: printr(next_output) # Set the main room; we want to wait until the websocket is open because this also sets your # status in the userlist if ws.main_config["default_room"]: set_current_room(ws, ws.main_config["default_room"]) # create a thread to run the blocking task mainthread = threading.Thread(target=main_loop) mainthread.start() # create a thread to process websocket output printthread = threading.Thread(target=ws_print_loop) printthread.daemon = True printthread.start() # Message handler for our websocket; will handle live messages for the room you're listening to and # userlist updates request results, but not much else (for now) def ws_onmessage(ws, message): logging.debug("WSRCV: " + message) result = json.loads(message) # Someone asked for the userlist, check the id to figure out what to print and which list to see if result["type"] == "userlist": all_statuses = result["data"]["statuses"] if result["id"] == "userlist_global": usermessage = " -- Global userlist --" statuses = all_statuses["0"] if "0" in all_statuses else {} else: # This is a bad assumption, it should parse the room id out of the id instead (maybe?) usermessage = " -- Userlist for %s -- " % ws.current_room_data["name"] statuses = all_statuses[str(ws.current_room)] if str(ws.current_room) in all_statuses else {} userlist_output = usermessage for key,value in statuses.items(): key = int(key) user = contentapi.get_user_or_default(result["data"]["objects"]["user"], key) userlist_output += "\n" + Style.BRIGHT + " " + ("%s" % (user["username"] + Style.DIM + " #%d" % key)) + Style.RESET_ALL + " - " + value ws_print(ws, userlist_output) return # Live updates are messages, edits, user updates, etc. Check the event list to see elif result["type"] == "live": # We only care about SOME live updates for event in result["data"]["events"]: if event["type"] == "message_event" and event["action"] == contentapi.CREATE: # I think this is a new message objects = result["data"]["objects"]["message_event"] message = contentapi.get_message(objects["message"], event["refId"]) if message and message["contentId"] == ws.current_room: # OK we're DEFINITELY displaying it now user = contentapi.get_user_or_default(objects["user"], message["createUserId"]) ws_print(ws, get_message_string(ws, message, user)) # Track ignored data if result["type"] not in ws.ignored: ws.ignored[result["type"]] = 0 ws.ignored[result["type"]] += 1 # Produce the string output for a given message. Can be printed directly to console def get_message_string(ws, message, user): result = (MSGPREFIX + Fore.CYAN + Style.BRIGHT + user["username"] + " " + Style.DIM + "#%d" % user["id"] + Fore.MAGENTA + " " + message["createDate"] + " [%d]" % message["id"] + "\n" + Style.RESET_ALL) for t in textwrap.wrap(message["text"], width = ws.main_config["fixed_width"]): result += (MSGPREFIX + t + "\n") return result.rstrip("\n") # Produce a large string of output for all history in the current room. Can be printed directly to console def get_message_history_string(ws): if ws.current_room: result = ws.context.basic_message_history(ws.current_room, ws.main_config["default_history"], "~engagement") users = result["objects"]["user"] message_block = "" for message in reversed(result["objects"]["message"]): user = contentapi.get_user_or_default(users, message["createUserId"]) message_block += get_message_string(ws, message, user) + "\n" return message_block.rstrip("\n") return None # Printing websocket event output is tricky because we don't want to interrupt user input (we don't have # curses). As such, we must buffer our output IF we are asked to pause def ws_print(ws, output): # Queueing is supposed to be threadsafe, so just slap a new one in there. This will wake up # the printing thread automatically ws.output_buffer.put(output) # Loads the config from file into the global config var. If the file # doesn't exist, the file is created from the defaults in config. # The function returns nothing def load_or_create_global_config(): global config # Check if the config file exists if os.path.isfile(CONFIGFILE): # Read and deserialize the config file with open(CONFIGFILE, 'r', encoding='utf-8') as f: temp_config = toml.load(f) myutils.merge_dictionary(temp_config, config) else: # Serialize and write the config dictionary to the config file logging.warn("No config found at " + CONFIGFILE + ", creating now") with open(CONFIGFILE, 'w', encoding='utf-8') as f: toml.dump(config, f) myutils.set_logging_level(config["default_loglevel"]) # Set the room to listen to on the websocket. Will also update the userlist, if # it's appropriate to do so def set_current_room(ws, roomid): try: ws.current_room_data = ws.context.get_by_id("content", roomid) ws.current_room = roomid # Generate the new user status list. Must always send the full list every time. statuses = { "%d" % roomid : "active" } if ws.main_config["appear_in_global"]: statuses["0"] = "active" ws.send(ws.context.gen_ws_request("setuserstatus", data = statuses)) print(Fore.GREEN + "Set room to %s" % ws.current_room_data["name"] + Style.RESET_ALL) return except Exception as ex: print(Fore.RED + "Couldn't find room with id %d" % roomid + Style.RESET_ALL) # Enter a search loop which will repeat until you quit. Output should be PAUSED here # (but someone else does it for us, we don't even know what 'pausing' is) def search(ws): while True: searchterm = input("Search text (#ROOMNUM = set room, # to quit): ") if searchterm == "#": return match = re.match(r'#(\d+)', searchterm) if match: roomid = int(match.group(1)) set_current_room(ws, roomid) return elif searchterm: # Go search for rooms and display them result = ws.context.basic_search(searchterm)["objects"]["content"] if len(result): for content in result: printr(Style.BRIGHT + "%7s" % ("#%d" % content["id"]) + Style.RESET_ALL + " - %s" % content["name"]) else: printr(Style.DIM + " -- No results -- ") # Either pull the token from a file, or get the login from the command # line if that doesn't work. WILL test your token against the real API # even if it's pulled from file! def authenticate(config, context: contentapi.ApiContext): message = "No token file found" if os.path.isfile(config["tokenfile"]): with open(config["tokenfile"], 'r') as f: token = f.read() logging.debug("Token from file: " + token) context.token = token if context.is_token_valid(): logging.info("Logged in using token file " + config["tokenfile"]) return else: message = "Token file expired" message += ", Please enter login for " + config["api"] while True: print(message) username = input("Username: ") password = getpass.getpass("Password: ") try: token = context.login(username, password, config["expire_seconds"]) with open(config["tokenfile"], 'w') as f: f.write(token) logging.info("Token accepted, written to " + config["tokenfile"]) context.token = token return except Exception as ex: print("ERROR: %s" % ex) message = "Please try logging in again:" def print_statusline(ws): # if ws_context.connected: bg = Back.GREEN else: bg = Back.RED if ws.current_room: name = ws.current_room_data["name"] room = "'" + (name[:(MAXTITLE - 3)] + '...' if len(name) > MAXTITLE else name) + "'" else: room = Fore.RED + Style.DIM + "NONE" + Style.NORMAL + Fore.BLACK print(Back.GREEN + Fore.BLACK + "\n " + ws.user_info["username"] + " - " + room + " CTRL: h s g u i c t q " + Style.RESET_ALL) # Print and then reset the style def printr(msg): print(msg + Style.RESET_ALL) # Because python reasons if __name__ == "__main__": main()