From e0393cf4107e8466be1c99684a269c99c1e5be5f Mon Sep 17 00:00:00 2001 From: Carlos Sanchez Date: Mon, 1 May 2023 10:35:31 -0400 Subject: [PATCH] Setting up more contentapi --- contentapi.py | 39 ++++++++++++++++++++++++++++++++++++++- main.py | 31 ++++++++++++++++++++++++++++--- test.bat | 3 ++- test_contentapi.py | 25 +++++++++++++++++++++++++ 4 files changed, 93 insertions(+), 5 deletions(-) create mode 100644 test_contentapi.py diff --git a/contentapi.py b/contentapi.py index 978adb2..d0f7855 100644 --- a/contentapi.py +++ b/contentapi.py @@ -102,4 +102,41 @@ class ApiContext: # Get information about the API. Very useful to test your connection to the API def api_status(self): - return self.get("status") \ No newline at end of file + return self.get("status") + + def search(self, requests): + return self.post("request", requests) + + # A very basic search for outputting to the console. Many assumptions are made! + def basic_search(self, searchterm, limit = 0): + return self.search({ + "values": { + "searchterm": searchterm, + "searchtermlike": "%" + searchterm + "%" + }, + "requests": [{ + "type": "content", + "fields": "~text,engagement", # All fields EXCEPT text and engagement + "query": "name LIKE @searchtermlike", + "order": "lastActionDate_desc", + "limit": limit + }] + }) + + # Return the singular item of 'type' for the given ID. Raises a "NotFoundError" if nothing found. + def get_by_id(self, type, id, fields = "*"): + result = self.search({ + "values" : { + "id" : id + }, + "requests": [{ + "type" : type, + "fields": fields, + "query": "id = @id" + }] + }) + + things = result["objects"][type] + if not len(things): + raise NotFoundError("Couldn't find %s with id %d" % (type, id)) + return things[0] \ No newline at end of file diff --git a/main.py b/main.py index 55dedc3..bd58cc9 100644 --- a/main.py +++ b/main.py @@ -5,6 +5,7 @@ import logging import getpass import textwrap import threading +import re import toml import readchar import websocket @@ -67,6 +68,7 @@ def main(): # Might as well reuse the websocket object for my websocket context data (oops, is that bad?) ws.user_info = context.user_me() ws.current_room = config["default_room"] + ws.current_room_data = False ws.pause_output = False # Whether all output from the websocket should be paused (including status updates) ws.output_buffer = [] # Individual print statements buffered from output. ws.main_config = config @@ -101,7 +103,9 @@ def ws_onopen(ws): while True: if printstatus: print_statusline(ws) - printstatus = True + + printstatus = True # Allow printing the statusline next time + ws.pause_output = False # Allow arbitrary output again key = readchar.readkey() # # Oops, websocket is not connected but you asked for a command that requires websocket! @@ -109,11 +113,13 @@ def ws_onopen(ws): # print("No websocket connection") # continue + ws.pause_output = True # Disable output for the duration of input handling + if key == "h": for key, value in commands.items(): print(" " + Style.BRIGHT + key + Style.NORMAL + " - " + value) elif key == "s": - print("not yet") + search(ws) elif key == "g": print("not yet") elif key == "u": @@ -137,6 +143,7 @@ def ws_onopen(ws): thread = threading.Thread(target=main_loop) thread.start() + def ws_onmessage(ws, message): pass @@ -160,6 +167,19 @@ def load_or_create_global_config(): myutils.set_logging_level(config["default_loglevel"]) +# Enter a search loop which will repeat until you quit. Output should be PAUSED here +# (but someone else does it for us, we don't even know what 'pausing' is) +def search(ws): + while True: + searchterm = input("Search text (#ROOMNUM = set room, # to quit): ") + if searchterm == "#": + return + match = re.match(r'#(\d+)', searchterm) + if match: + digits = int(match.group(1)) + + num = int(digits) + print(num) # Either pull the token from a file, or get the login from the command # line if that doesn't work. WILL test your token against the real API @@ -196,7 +216,12 @@ def authenticate(config, context: contentapi.ApiContext): def print_statusline(ws): # if ws_context.connected: bg = Back.GREEN else: bg = Back.RED - print(Back.GREEN + Fore.BLACK + "\n User: " + ws.user_info["username"] + " CTRL: h s g u i q " + Style.RESET_ALL) + if ws.current_room: + name = ws.current_room_data["name"] + room = "'" + (name[:12] + '...' if len(name) > 15 else name) + "'" + else: + room = Fore.RED + Style.DIM + "NONE" + Style.NORMAL + Fore.BLACK + print(Back.GREEN + Fore.BLACK + "\n " + ws.user_info["username"] + " - " + room + " CTRL: h s g u i q " + Style.RESET_ALL) # Because python reasons if __name__ == "__main__": diff --git a/test.bat b/test.bat index 6b80946..52275db 100644 --- a/test.bat +++ b/test.bat @@ -4,4 +4,5 @@ REM Change python whatever set pyexe=python34\python.exe REM And now, run all the various tests we have -%pyexe% test_myutils.py \ No newline at end of file +%pyexe% test_myutils.py +%pyexe% test_contentapi.py diff --git a/test_contentapi.py b/test_contentapi.py new file mode 100644 index 0000000..556ec03 --- /dev/null +++ b/test_contentapi.py @@ -0,0 +1,25 @@ + +import unittest +import contentapi +import logging + +class TestContentapi(unittest.TestCase): + + def setUp(self) -> None: + # MAYBE change this some time? + self.api = contentapi.ApiContext("https://oboy.smilebasicsource.com/api", logging) + + def test_apistatus(self): + result = self.api.api_status() + self.assertTrue("version" in result) + + def test_is_token_valid_none(self): + self.assertFalse(self.api.is_token_valid()) + + def test_is_token_valid_garbage(self): + self.api.token = "literalgarbage" + self.assertFalse(self.api.is_token_valid()) + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file