Rudimentarily refiddled to support modules

This commit is contained in:
Fierelier 2023-05-02 04:16:22 +02:00
parent 6aee75c803
commit 9d0b658444
10 changed files with 467 additions and 386 deletions

404
main.py Normal file → Executable file
View File

@ -1,384 +1,20 @@
import os
import json
import logging
import getpass
import textwrap
import threading
import platform
import queue
import re
import toml
import readchar
import websocket
import win_unicode_console
from collections import OrderedDict
from colorama import Fore, Back, Style, init as colorama_init, ansi as colorama_ansi
import contentapi
import myutils
VERSION="0.2.0" # Arbitrary but whatever
CONFIGFILE="config.toml"
MAXTITLE=25
MSGPREFIX=Back.GREEN + " " + Back.RESET + " "
# The entire config object with all defaults
config = {
"api" : "http://localhost:5000/api",
"default_loglevel" : "WARNING",
"websocket_trace" : False,
"default_room" : 0, # Zero means it will ask you for a room
"default_markup" : "plaintext",
"expire_seconds" : 31536000, # 365 days in seconds, expiration for token
"appear_in_global" : False,
"print_status_after_insert" : True,
"output_buffer_timeout" : 0.05, # 50 milliseconds
"default_history" : 100,
"fixed_width" : 76, # Need room for the pre-message decoration
"tokenfile" : ".qcstoken"
}
# The command dictionary (only used to display help)
commands = OrderedDict([
("h", "Help, prints this menu!"),
("s", "Search, find and/or set a room to listen to (one at a time!)"),
("g", "Global userlist, print users using contentapi in general"),
("u", "Userlist, print users in the current room"),
("i", "Insert mode, allows you to send a message (pauses messages!)"),
("c", "Clear; clear screen and load last N messages manually"),
("t", "Statistics, see info about runtime"),
("q", "Quit, no warning!")
])
def main():
print("Program start: %s" % VERSION)
win_unicode_console.enable()
colorama_init() # colorama init
load_or_create_global_config()
logging.info("Config: " + json.dumps(config, indent = 2))
context = contentapi.ApiContext(config["api"], logging)
logging.info("Testing connection to API at " + config["api"])
logging.debug(json.dumps(context.api_status(), indent = 2))
authenticate(config, context)
# Let users debug the websocket if they want I guess
if config["websocket_trace"]:
websocket.enableTrace(True)
ws = websocket.WebSocketApp(context.websocket_endpoint())
# Might as well reuse the websocket object for my websocket context data (oops, is that bad?)
ws.context = context
ws.user_info = context.user_me()
ws.main_config = config
ws.ignored = {} # Just a tracking list for fun stats
# Output from the websocket can only get truly printed to the screen when this lock is released.
# We grab the lock when the user is in some kind of "input" mode, which blocks the websocket printing
# thread from processing the queue (if it has anything anyway)
ws.output_lock = threading.Lock()
# Output from the websocket is ALWAYS buffered, and we use a thread-safe queue to add and remove
# output safely. We buffer all messages to ensure the order is preserved; if we SOMETIMES queued and
# SOMETIMES did not, we would need to be very careful about whether the queue was empty, which requires
# additional locking and etc.
ws.output_buffer = queue.Queue()
# Note that the current_room stuff is set on open if you've supplied a default room in config
ws.current_room = 0
ws.current_room_data = False
# set the callback functions
ws.on_open = ws_onopen
ws.on_close = ws_onclose
ws.on_message = ws_onmessage
ws.run_forever()
print("Program end")
def ws_onclose(ws):
print("Websocket closed! Exiting (FYI: you were in room %d)" % ws.current_room)
exit()
# When the websocket is opened, we begin our thread to accept input forever
def ws_onopen(ws):
def main_loop():
printr(Fore.GREEN + Style.BRIGHT + os.linesep + "-- Connected to live updates! --")
if not ws.current_room:
printr(Fore.YELLOW + "* You are not connected to any room! Press 'S' to search for a room! *")
print_statusline(ws)
# The infinite input loop! Or something!
while True:
printstatus = False # Assume we are not printing the status every time (it's kinda annoying)
# Listen for a key. The implementation of 'readkey()' on this version of python and windows makes the CPU spike
# to 100%, so we switch to getch() if we detect windows.
if platform.system().startswith("Windows"):
import msvcrt
logging.debug("on windows, using msvcrt.getch")
key = msvcrt.getch().decode("utf-8")
else:
key = readchar.readkey()
# We get exclusive access to output while we're handling user input. This allows us to "pause" any output
# the threaded output queue might want to do (we're more important)
with ws.output_lock:
if key == "h":
print(" -- Help menu / Controls --")
for key, value in commands.items():
print(" " + Style.BRIGHT + key + Style.NORMAL + " - " + value)
elif key == "s":
search(ws)
printstatus = True
elif key == "g":
ws.send(ws.context.gen_ws_request("userlist", id = "userlist_global"))
elif key == "u":
if not ws.current_room:
print("You're not in a room! Can't check userlist!")
else:
# Just send it out, we have to wait for the websocket handler to get the response
ws.send(ws.context.gen_ws_request("userlist", id = "userlist_room_%d" % ws.current_room))
elif key == "i":
if not ws.current_room:
print("You're not in a room! Can't send messages!")
else:
message = input("Post (empty = exit): ")
if message:
ws.context.post_message(contentapi.comment_builder(message,
ws.current_room, ws.main_config["default_markup"], ws.user_info["avatar"]))
printstatus = ws.main_config["print_status_after_insert"]
elif key == "c":
print(colorama_ansi.clear_screen() + " -- Pulling message history: %d -- " % ws.main_config["default_history"])
print(get_message_history_string(ws))
printstatus = True
elif key == "t":
print(" -- Ignored WS Data (normal) --")
for key,value in ws.ignored.items():
printr(Style.BRIGHT + ("%16s" % key) + (" : %d" % value))
elif key == "q":
print("Quitting (may take a bit for the websocket to close)")
ws.close()
break
elif key == " ":
printstatus = True
# At the end of the loop, but still in the printing lock, print the status line (if they want)
if printstatus:
print_statusline(ws)
# Just a simple infinite loop which blocks on the queue until something is available
def ws_print_loop():
while True:
next_output = ws.output_buffer.get()
with ws.output_lock:
printr(next_output)
# Set the main room; we want to wait until the websocket is open because this also sets your
# status in the userlist
if ws.main_config["default_room"]:
set_current_room(ws, ws.main_config["default_room"])
# create a thread to run the blocking task
mainthread = threading.Thread(target=main_loop)
mainthread.start()
# create a thread to process websocket output
printthread = threading.Thread(target=ws_print_loop)
printthread.daemon = True
printthread.start()
# Message handler for our websocket; will handle live messages for the room you're listening to and
# userlist updates request results, but not much else (for now)
def ws_onmessage(ws, message):
logging.debug("WSRCV: " + message)
result = json.loads(message)
# Someone asked for the userlist, check the id to figure out what to print and which list to see
if result["type"] == "userlist":
all_statuses = result["data"]["statuses"]
if result["id"] == "userlist_global":
usermessage = " -- Global userlist --"
statuses = all_statuses["0"] if "0" in all_statuses else {}
else: # This is a bad assumption, it should parse the room id out of the id instead (maybe?)
usermessage = " -- Userlist for %s -- " % ws.current_room_data["name"]
statuses = all_statuses[str(ws.current_room)] if str(ws.current_room) in all_statuses else {}
userlist_output = usermessage
for key,value in statuses.items():
key = int(key)
user = contentapi.get_user_or_default(result["data"]["objects"]["user"], key)
userlist_output += os.linesep + Style.BRIGHT + " " + ("%s" % (user["username"] + Style.DIM + " #%d" % key)) + Style.RESET_ALL + " - " + value
ws_print(ws, userlist_output)
return
# Live updates are messages, edits, user updates, etc. Check the event list to see
elif result["type"] == "live":
# We only care about SOME live updates
for event in result["data"]["events"]:
if event["type"] == "message_event" and event["action"] == contentapi.CREATE: # I think this is a new message
objects = result["data"]["objects"]["message_event"]
message = contentapi.get_message(objects["message"], event["refId"])
if message and message["contentId"] == ws.current_room:
# OK we're DEFINITELY displaying it now
user = contentapi.get_user_or_default(objects["user"], message["createUserId"])
ws_print(ws, get_message_string(ws, message, user))
# Track ignored data
if result["type"] not in ws.ignored:
ws.ignored[result["type"]] = 0
ws.ignored[result["type"]] += 1
# Produce the string output for a given message. Can be printed directly to console
def get_message_string(ws, message, user):
result = (MSGPREFIX + Fore.CYAN + Style.BRIGHT + user["username"] + " " + Style.DIM + "#%d" % user["id"] +
Fore.MAGENTA + " " + message["createDate"] + " [%d]" % message["id"] + os.linesep + Style.RESET_ALL)
for t in textwrap.wrap(message["text"], width = ws.main_config["fixed_width"]):
result += (MSGPREFIX + t + os.linesep)
return result.rstrip(os.linesep)
# Produce a large string of output for all history in the current room. Can be printed directly to console
def get_message_history_string(ws):
if ws.current_room:
result = ws.context.basic_message_history(ws.current_room, ws.main_config["default_history"], "~engagement")
users = result["objects"]["user"]
message_block = ""
for message in reversed(result["objects"]["message"]):
user = contentapi.get_user_or_default(users, message["createUserId"])
message_block += get_message_string(ws, message, user) + os.linesep
return message_block.rstrip(os.linesep)
return None
# Printing websocket event output is tricky because we don't want to interrupt user input (we don't have
# curses). As such, we must buffer our output IF we are asked to pause
def ws_print(ws, output):
# Queueing is supposed to be threadsafe, so just slap a new one in there. This will wake up
# the printing thread automatically
ws.output_buffer.put(output)
# Loads the config from file into the global config var. If the file
# doesn't exist, the file is created from the defaults in config.
# The function returns nothing
def load_or_create_global_config():
global config
# Check if the config file exists
if os.path.isfile(CONFIGFILE):
# Read and deserialize the config file
with open(CONFIGFILE, 'r', encoding='utf-8') as f:
temp_config = toml.load(f)
myutils.merge_dictionary(temp_config, config)
else:
# Serialize and write the config dictionary to the config file
logging.warn("No config found at " + CONFIGFILE + ", creating now")
with open(CONFIGFILE, 'w', encoding='utf-8') as f:
toml.dump(config, f)
myutils.set_logging_level(config["default_loglevel"])
# Set the room to listen to on the websocket. Will also update the userlist, if
# it's appropriate to do so
def set_current_room(ws, roomid):
try:
ws.current_room_data = ws.context.get_by_id("content", roomid)
ws.current_room = roomid
# Generate the new user status list. Must always send the full list every time.
statuses = { "%d" % roomid : "active" }
if ws.main_config["appear_in_global"]:
statuses["0"] = "active"
ws.send(ws.context.gen_ws_request("setuserstatus", data = statuses))
print(Fore.GREEN + "Set room to %s" % ws.current_room_data["name"] + Style.RESET_ALL)
return
except Exception as ex:
print(Fore.RED + "Couldn't find room with id %d" % roomid + Style.RESET_ALL)
# Enter a search loop which will repeat until you quit. Output should be PAUSED here
# (but someone else does it for us, we don't even know what 'pausing' is)
def search(ws):
while True:
searchterm = input("Search text (#ROOMNUM = set room, # to quit): ")
if searchterm == "#":
return
match = re.match(r'#(\d+)', searchterm)
if match:
roomid = int(match.group(1))
set_current_room(ws, roomid)
return
elif searchterm:
# Go search for rooms and display them
result = ws.context.basic_search(searchterm)["objects"]["content"]
if len(result):
for content in result:
printr(Style.BRIGHT + "%7s" % ("#%d" % content["id"]) + Style.RESET_ALL + " - %s" % content["name"])
else:
printr(Style.DIM + " -- No results -- ")
# Either pull the token from a file, or get the login from the command
# line if that doesn't work. WILL test your token against the real API
# even if it's pulled from file!
def authenticate(config, context: contentapi.ApiContext):
message = "No token file found"
if os.path.isfile(config["tokenfile"]):
with open(config["tokenfile"], 'r') as f:
token = f.read()
logging.debug("Token from file: " + token)
context.token = token
if context.is_token_valid():
logging.info("Logged in using token file " + config["tokenfile"])
return
else:
message = "Token file expired"
message += ", Please enter login for " + config["api"]
while True:
print(message)
username = input("Username: ")
password = getpass.getpass("Password: ")
try:
token = context.login(username, password, config["expire_seconds"])
with open(config["tokenfile"], 'w') as f:
f.write(token)
logging.info("Token accepted, written to " + config["tokenfile"])
context.token = token
return
except Exception as ex:
print("ERROR: %s" % ex)
message = "Please try logging in again:"
def print_statusline(ws):
# if ws_context.connected: bg = Back.GREEN else: bg = Back.RED
if ws.current_room:
name = ws.current_room_data["name"]
room = "'" + (name[:(MAXTITLE - 3)] + '...' if len(name) > MAXTITLE else name) + "'"
else:
room = Fore.RED + Style.DIM + "NONE" + Style.NORMAL + Fore.BLACK
print(Back.GREEN + Fore.BLACK + os.linesep + " " + ws.user_info["username"] + " - " + room + " CTRL: h s g u i c t q " + Style.RESET_ALL)
# Print and then reset the style
def printr(msg):
print(msg + Style.RESET_ALL)
# Because python reasons
if __name__ == "__main__":
main()
#!/usr/bin/env python3
# App launcher
def init():
import sys
sys.dont_write_bytecode = True
import os
if getattr(sys, 'frozen', False) and hasattr(sys, '_MEIPASS'):
s = os.path.realpath(sys.executable)
else:
s = os.path.realpath(__file__)
sys.path.insert(0,os.path.join(os.path.dirname(s),"user","modules"))
global app
import app
app.init(s,"pycapi") # Change the second argument to your app name, I recommend using your domain if you have one. For example, If you own "example.com", and the program is called "app", then you should name it com.example.app.
init()
if __name__ == "__main__":
app.mod["main"].main()

View File

@ -4,5 +4,5 @@ REM Change python whatever
set pyexe=python34\python.exe
REM And now, run all the various tests we have
%pyexe% test_myutils.py
%pyexe% test_contentapi.py
%pyexe% user\modules\test_myutils.py
%pyexe% user\modules\test_contentapi.py

1
user/modules.toml Normal file
View File

@ -0,0 +1 @@
main = "$DISTRO.main"

65
user/modules/app.py Normal file
View File

@ -0,0 +1,65 @@
import sys
import os
def initConfigDirs():
for i in configDirs:
fi = os.path.join(i,"modules")
try:
del sys.path[sys.path.index(fi)]
except Exception:
pass
for i in reversed(configDirs):
sys.path.insert(0,os.path.join(i,"modules"))
def configFindFile(name):
for i in configDirs:
fi = os.path.join(i,name)
if os.path.isfile(fi): yield fi
def init(scp,dist):
global mod,distro,p,s,sd,configDirs
mod = {}
distro = dist
p = os.path.join
s = scp
sd = os.path.dirname(s)
configDirs = [
p(os.environ.get("HOME",os.environ.get("USERPROFILE",None)),".config",distro),
p(sd,"user")
]
initConfigDirs()
modules = {}
import toml
for file in configFindFile("modules.toml"):
tbl = toml.loads(open(file).read())
for i in tbl:
if not i in modules:
modules[i] = tbl[i]
length = len(sys.argv)
index = 1
while index < length:
arg = sys.argv[index]
if arg.startswith("--app:module"):
arg = arg.split("--app:module:",1)[1].split("=",1)
modules[arg[0]] = arg[1].replace("$DISTRO",distro)
del sys.argv[index]
length -= 1
continue
index += 1
import importlib
glb = globals()
for module in modules:
modules[module] = modules[module].replace("$DISTRO",distro)
print("* Importing '" +modules[module]+ "' -> '" +module+ "' ...")
mod[module] = importlib.import_module(modules[module])
for module in modules:
if hasattr(mod[module],"appInit"):
print("* Initializing '" +module+ "' ...")
mod[module].appInit()
print("* Running app.")

379
user/modules/pycapi/main.py Normal file
View File

@ -0,0 +1,379 @@
import os
import json
import logging
import getpass
import textwrap
import threading
import platform
import queue
import re
import toml
import readchar
import websocket
#import win_unicode_console
from collections import OrderedDict
from colorama import Fore, Back, Style, init as colorama_init, ansi as colorama_ansi
import contentapi
import myutils
VERSION="0.2.0" # Arbitrary but whatever
CONFIGFILE="config.toml"
MAXTITLE=25
MSGPREFIX=Back.GREEN + " " + Back.RESET + " "
# The entire config object with all defaults
config = {
"api" : "http://localhost:5000/api",
"default_loglevel" : "WARNING",
"websocket_trace" : False,
"default_room" : 0, # Zero means it will ask you for a room
"default_markup" : "plaintext",
"expire_seconds" : 31536000, # 365 days in seconds, expiration for token
"appear_in_global" : False,
"print_status_after_insert" : True,
"output_buffer_timeout" : 0.05, # 50 milliseconds
"default_history" : 100,
"fixed_width" : 76, # Need room for the pre-message decoration
"tokenfile" : ".qcstoken"
}
# The command dictionary (only used to display help)
commands = OrderedDict([
("h", "Help, prints this menu!"),
("s", "Search, find and/or set a room to listen to (one at a time!)"),
("g", "Global userlist, print users using contentapi in general"),
("u", "Userlist, print users in the current room"),
("i", "Insert mode, allows you to send a message (pauses messages!)"),
("c", "Clear; clear screen and load last N messages manually"),
("t", "Statistics, see info about runtime"),
("q", "Quit, no warning!")
])
def main():
print("Program start: %s" % VERSION)
#win_unicode_console.enable()
colorama_init() # colorama init
load_or_create_global_config()
logging.info("Config: " + json.dumps(config, indent = 2))
context = contentapi.ApiContext(config["api"], logging)
logging.info("Testing connection to API at " + config["api"])
logging.debug(json.dumps(context.api_status(), indent = 2))
authenticate(config, context)
# Let users debug the websocket if they want I guess
if config["websocket_trace"]:
websocket.enableTrace(True)
ws = websocket.WebSocketApp(context.websocket_endpoint())
# Might as well reuse the websocket object for my websocket context data (oops, is that bad?)
ws.context = context
ws.user_info = context.user_me()
ws.main_config = config
ws.ignored = {} # Just a tracking list for fun stats
# Output from the websocket can only get truly printed to the screen when this lock is released.
# We grab the lock when the user is in some kind of "input" mode, which blocks the websocket printing
# thread from processing the queue (if it has anything anyway)
ws.output_lock = threading.Lock()
# Output from the websocket is ALWAYS buffered, and we use a thread-safe queue to add and remove
# output safely. We buffer all messages to ensure the order is preserved; if we SOMETIMES queued and
# SOMETIMES did not, we would need to be very careful about whether the queue was empty, which requires
# additional locking and etc.
ws.output_buffer = queue.Queue()
# Note that the current_room stuff is set on open if you've supplied a default room in config
ws.current_room = 0
ws.current_room_data = False
# set the callback functions
ws.on_open = ws_onopen
ws.on_close = ws_onclose
ws.on_message = ws_onmessage
ws.run_forever()
print("Program end")
def ws_onclose(ws):
print("Websocket closed! Exiting (FYI: you were in room %d)" % ws.current_room)
exit()
# When the websocket is opened, we begin our thread to accept input forever
def ws_onopen(ws):
def main_loop():
printr(Fore.GREEN + Style.BRIGHT + os.linesep + "-- Connected to live updates! --")
if not ws.current_room:
printr(Fore.YELLOW + "* You are not connected to any room! Press 'S' to search for a room! *")
print_statusline(ws)
# The infinite input loop! Or something!
while True:
printstatus = False # Assume we are not printing the status every time (it's kinda annoying)
# Listen for a key. The implementation of 'readkey()' on this version of python and windows makes the CPU spike
# to 100%, so we switch to getch() if we detect windows.
if platform.system().startswith("Windows"):
import msvcrt
logging.debug("on windows, using msvcrt.getch")
key = msvcrt.getch().decode("utf-8")
else:
key = readchar.readkey()
# We get exclusive access to output while we're handling user input. This allows us to "pause" any output
# the threaded output queue might want to do (we're more important)
with ws.output_lock:
if key == "h":
print(" -- Help menu / Controls --")
for key, value in commands.items():
print(" " + Style.BRIGHT + key + Style.NORMAL + " - " + value)
elif key == "s":
search(ws)
printstatus = True
elif key == "g":
ws.send(ws.context.gen_ws_request("userlist", id = "userlist_global"))
elif key == "u":
if not ws.current_room:
print("You're not in a room! Can't check userlist!")
else:
# Just send it out, we have to wait for the websocket handler to get the response
ws.send(ws.context.gen_ws_request("userlist", id = "userlist_room_%d" % ws.current_room))
elif key == "i":
if not ws.current_room:
print("You're not in a room! Can't send messages!")
else:
message = input("Post (empty = exit): ")
if message:
ws.context.post_message(contentapi.comment_builder(message,
ws.current_room, ws.main_config["default_markup"], ws.user_info["avatar"]))
printstatus = ws.main_config["print_status_after_insert"]
elif key == "c":
print(colorama_ansi.clear_screen() + " -- Pulling message history: %d -- " % ws.main_config["default_history"])
print(get_message_history_string(ws))
printstatus = True
elif key == "t":
print(" -- Ignored WS Data (normal) --")
for key,value in ws.ignored.items():
printr(Style.BRIGHT + ("%16s" % key) + (" : %d" % value))
elif key == "q":
print("Quitting (may take a bit for the websocket to close)")
ws.close()
break
elif key == " ":
printstatus = True
# At the end of the loop, but still in the printing lock, print the status line (if they want)
if printstatus:
print_statusline(ws)
# Just a simple infinite loop which blocks on the queue until something is available
def ws_print_loop():
while True:
next_output = ws.output_buffer.get()
with ws.output_lock:
printr(next_output)
# Set the main room; we want to wait until the websocket is open because this also sets your
# status in the userlist
if ws.main_config["default_room"]:
set_current_room(ws, ws.main_config["default_room"])
# create a thread to run the blocking task
mainthread = threading.Thread(target=main_loop)
mainthread.start()
# create a thread to process websocket output
printthread = threading.Thread(target=ws_print_loop)
printthread.daemon = True
printthread.start()
# Message handler for our websocket; will handle live messages for the room you're listening to and
# userlist updates request results, but not much else (for now)
def ws_onmessage(ws, message):
logging.debug("WSRCV: " + message)
result = json.loads(message)
# Someone asked for the userlist, check the id to figure out what to print and which list to see
if result["type"] == "userlist":
all_statuses = result["data"]["statuses"]
if result["id"] == "userlist_global":
usermessage = " -- Global userlist --"
statuses = all_statuses["0"] if "0" in all_statuses else {}
else: # This is a bad assumption, it should parse the room id out of the id instead (maybe?)
usermessage = " -- Userlist for %s -- " % ws.current_room_data["name"]
statuses = all_statuses[str(ws.current_room)] if str(ws.current_room) in all_statuses else {}
userlist_output = usermessage
for key,value in statuses.items():
key = int(key)
user = contentapi.get_user_or_default(result["data"]["objects"]["user"], key)
userlist_output += os.linesep + Style.BRIGHT + " " + ("%s" % (user["username"] + Style.DIM + " #%d" % key)) + Style.RESET_ALL + " - " + value
ws_print(ws, userlist_output)
return
# Live updates are messages, edits, user updates, etc. Check the event list to see
elif result["type"] == "live":
# We only care about SOME live updates
for event in result["data"]["events"]:
if event["type"] == "message_event" and event["action"] == contentapi.CREATE: # I think this is a new message
objects = result["data"]["objects"]["message_event"]
message = contentapi.get_message(objects["message"], event["refId"])
if message and message["contentId"] == ws.current_room:
# OK we're DEFINITELY displaying it now
user = contentapi.get_user_or_default(objects["user"], message["createUserId"])
ws_print(ws, get_message_string(ws, message, user))
# Track ignored data
if result["type"] not in ws.ignored:
ws.ignored[result["type"]] = 0
ws.ignored[result["type"]] += 1
# Produce the string output for a given message. Can be printed directly to console
def get_message_string(ws, message, user):
result = (MSGPREFIX + Fore.CYAN + Style.BRIGHT + user["username"] + " " + Style.DIM + "#%d" % user["id"] +
Fore.MAGENTA + " " + message["createDate"] + " [%d]" % message["id"] + os.linesep + Style.RESET_ALL)
for t in textwrap.wrap(message["text"], width = ws.main_config["fixed_width"]):
result += (MSGPREFIX + t + os.linesep)
return result.rstrip(os.linesep)
# Produce a large string of output for all history in the current room. Can be printed directly to console
def get_message_history_string(ws):
if ws.current_room:
result = ws.context.basic_message_history(ws.current_room, ws.main_config["default_history"], "~engagement")
users = result["objects"]["user"]
message_block = ""
for message in reversed(result["objects"]["message"]):
user = contentapi.get_user_or_default(users, message["createUserId"])
message_block += get_message_string(ws, message, user) + os.linesep
return message_block.rstrip(os.linesep)
return None
# Printing websocket event output is tricky because we don't want to interrupt user input (we don't have
# curses). As such, we must buffer our output IF we are asked to pause
def ws_print(ws, output):
# Queueing is supposed to be threadsafe, so just slap a new one in there. This will wake up
# the printing thread automatically
ws.output_buffer.put(output)
# Loads the config from file into the global config var. If the file
# doesn't exist, the file is created from the defaults in config.
# The function returns nothing
def load_or_create_global_config():
global config
# Check if the config file exists
if os.path.isfile(CONFIGFILE):
# Read and deserialize the config file
with open(CONFIGFILE, 'r', encoding='utf-8') as f:
temp_config = toml.load(f)
myutils.merge_dictionary(temp_config, config)
else:
# Serialize and write the config dictionary to the config file
logging.warn("No config found at " + CONFIGFILE + ", creating now")
with open(CONFIGFILE, 'w', encoding='utf-8') as f:
toml.dump(config, f)
myutils.set_logging_level(config["default_loglevel"])
# Set the room to listen to on the websocket. Will also update the userlist, if
# it's appropriate to do so
def set_current_room(ws, roomid):
try:
ws.current_room_data = ws.context.get_by_id("content", roomid)
ws.current_room = roomid
# Generate the new user status list. Must always send the full list every time.
statuses = { "%d" % roomid : "active" }
if ws.main_config["appear_in_global"]:
statuses["0"] = "active"
ws.send(ws.context.gen_ws_request("setuserstatus", data = statuses))
print(Fore.GREEN + "Set room to %s" % ws.current_room_data["name"] + Style.RESET_ALL)
return
except Exception as ex:
print(Fore.RED + "Couldn't find room with id %d" % roomid + Style.RESET_ALL)
# Enter a search loop which will repeat until you quit. Output should be PAUSED here
# (but someone else does it for us, we don't even know what 'pausing' is)
def search(ws):
while True:
searchterm = input("Search text (#ROOMNUM = set room, # to quit): ")
if searchterm == "#":
return
match = re.match(r'#(\d+)', searchterm)
if match:
roomid = int(match.group(1))
set_current_room(ws, roomid)
return
elif searchterm:
# Go search for rooms and display them
result = ws.context.basic_search(searchterm)["objects"]["content"]
if len(result):
for content in result:
printr(Style.BRIGHT + "%7s" % ("#%d" % content["id"]) + Style.RESET_ALL + " - %s" % content["name"])
else:
printr(Style.DIM + " -- No results -- ")
# Either pull the token from a file, or get the login from the command
# line if that doesn't work. WILL test your token against the real API
# even if it's pulled from file!
def authenticate(config, context: contentapi.ApiContext):
message = "No token file found"
if os.path.isfile(config["tokenfile"]):
with open(config["tokenfile"], 'r') as f:
token = f.read()
logging.debug("Token from file: " + token)
context.token = token
if context.is_token_valid():
logging.info("Logged in using token file " + config["tokenfile"])
return
else:
message = "Token file expired"
message += ", Please enter login for " + config["api"]
while True:
print(message)
username = input("Username: ")
password = getpass.getpass("Password: ")
try:
token = context.login(username, password, config["expire_seconds"])
with open(config["tokenfile"], 'w') as f:
f.write(token)
logging.info("Token accepted, written to " + config["tokenfile"])
context.token = token
return
except Exception as ex:
print("ERROR: %s" % ex)
message = "Please try logging in again:"
def print_statusline(ws):
# if ws_context.connected: bg = Back.GREEN else: bg = Back.RED
if ws.current_room:
name = ws.current_room_data["name"]
room = "'" + (name[:(MAXTITLE - 3)] + '...' if len(name) > MAXTITLE else name) + "'"
else:
room = Fore.RED + Style.DIM + "NONE" + Style.NORMAL + Fore.BLACK
print(Back.GREEN + Fore.BLACK + os.linesep + " " + ws.user_info["username"] + " - " + room + " CTRL: h s g u i c t q " + Style.RESET_ALL)
# Print and then reset the style
def printr(msg):
print(msg + Style.RESET_ALL)