From 5671ee8f558b4d0edcd1f3b928dcbf0d66ebd2bf Mon Sep 17 00:00:00 2001 From: Carlos Sanchez Date: Mon, 1 May 2023 20:13:35 -0400 Subject: [PATCH] Replace complicated crap with simple blocking queue --- main.py | 69 ++++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 44 insertions(+), 25 deletions(-) diff --git a/main.py b/main.py index 4c61f5c..9eb4562 100644 --- a/main.py +++ b/main.py @@ -6,6 +6,7 @@ import getpass import textwrap import threading import platform +import queue import re import toml import readchar @@ -74,8 +75,16 @@ def main(): # Might as well reuse the websocket object for my websocket context data (oops, is that bad?) ws.context = context ws.user_info = context.user_me() - ws.output_lock = threading.Lock() - ws.output_buffer = [] # Individual print statements buffered from output. + # Output from the websocket can only get truly printed to the screen when this lock is released. + # We grab the lock when the user is in some kind of "input" mode, which blocks the websocket printing + # thread from processing the queue (if it has anything anyway) + ws.output_lock = threading.Lock() + # Output from the websocket is ALWAYS buffered, and we use a thread-safe queue to add and remove + # output safely. We buffer all messages to ensure the order is preserved; if we SOMETIMES queued and + # SOMETIMES did not, we would need to be very careful about whether the queue was empty, which requires + # additional locking and etc. + ws.output_buffer = queue.Queue() + ws.main_config = config ws.current_room = 0 ws.current_room_data = False @@ -107,20 +116,17 @@ def ws_onopen(ws): if not ws.current_room: printr(Fore.YELLOW + "* You are not connected to any room! Press 'S' to search for a room! *") - printstatus = True + print_statusline(ws) # The infinite input loop! Or something! while True: - # Dump the buffer, if there is any, since we're outside of user input - with ws.output_lock: - for output in ws.output_buffer: - printr(output) + # # Dump the buffer, if there is any, since we're outside of user input + # with ws.output_lock: + # for output in ws.output_buffer: + # printr(output) - ws.output_buffer = [] - - if printstatus: - print_statusline(ws) + # ws.output_buffer = [] printstatus = False # Assume we are not printing the status every time (it's kinda annoying) @@ -175,14 +181,30 @@ def ws_onopen(ws): elif key == " ": printstatus = True + # At the end of the loop, but still in the printing lock, print the status line + if printstatus: + print_statusline(ws) + + # Just a simple infinite loop which blocks on the queue until something is available + def ws_print_loop(): + while True: + next_output = ws.output_buffer.get() + with ws.output_lock: + printr(next_output) + # Set the main room; we want to wait until the websocket is open because this also sets your # status in the userlist if ws.main_config["default_room"]: set_current_room(ws, ws.main_config["default_room"]) # create a thread to run the blocking task - thread = threading.Thread(target=main_loop) - thread.start() + mainthread = threading.Thread(target=main_loop) + mainthread.start() + + # create a thread to process websocket output + printthread = threading.Thread(target=ws_print_loop) + printthread.daemon = True + printthread.start() # Message handler for our websocket; will handle live messages for the room you're listening to and @@ -200,12 +222,12 @@ def ws_onmessage(ws, message): else: # This is a bad assumption, it should parse the room id out of the id instead (maybe?) usermessage = " -- Userlist for %s -- " % ws.current_room_data["name"] statuses = all_statuses[str(ws.current_room)] if str(ws.current_room) in all_statuses else {} - ws_print(ws, usermessage) + userlist_output = usermessage for key,value in statuses.items(): key = int(key) user = contentapi.get_user_or_default(result["data"]["objects"]["user"], key) - # Weird parenthesis are because I was aligning printed data before - ws_print(ws, Style.BRIGHT + " " + ("%s" % (user["username"] + Style.DIM + " #%d" % key)) + Style.RESET_ALL + " - " + value) + userlist_output += "\n" + Style.BRIGHT + " " + ("%s" % (user["username"] + Style.DIM + " #%d" % key)) + Style.RESET_ALL + " - " + value + ws_print(ws, userlist_output) return # Live updates are messages, edits, user updates, etc. Check the event list to see elif result["type"] == "live": @@ -224,6 +246,7 @@ def ws_onmessage(ws, message): ws.ignored[result["type"]] = 0 ws.ignored[result["type"]] += 1 + # Produce the string output for a given message. Can be printed directly to console def get_message_string(ws, message, user): result = (MSGPREFIX + Fore.CYAN + Style.BRIGHT + user["username"] + " " + Style.DIM + "#%d" % user["id"] + @@ -232,6 +255,7 @@ def get_message_string(ws, message, user): result += (MSGPREFIX + t + "\n") return result.rstrip("\n") + # Produce a large string of output for all history in the current room. Can be printed directly to console def get_message_history_string(ws): if ws.current_room: @@ -244,18 +268,13 @@ def get_message_history_string(ws): return message_block.rstrip("\n") return None + # Printing websocket event output is tricky because we don't want to interrupt user input (we don't have # curses). As such, we must buffer our output IF we are asked to pause def ws_print(ws, output): - # We can't block output for too long, so we need to only try to acquire the lock for a very short time - if ws.output_lock.acquire(timeout=ws.main_config["output_buffer_timeout"]): - try: - printr(output) - finally: - ws.output_lock.release() - else: - logging.debug("Buffering output") - ws.output_buffer.append(output) + # Queueing is supposed to be threadsafe, so just slap a new one in there. This will wake up + # the printing thread automatically + ws.output_buffer.put(output) # Loads the config from file into the global config var. If the file