diff --git a/README.md b/README.md index e4e4357..3fa87b6 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -

collegamento v0.2.0

+

collegamento v0.3.0-rc0

A tool that makes it much easier to make offload work when asyncio isn't an option. diff --git a/collegamento/__init__.py b/collegamento/__init__.py index fdc6461..cc0bbaa 100644 --- a/collegamento/__init__.py +++ b/collegamento/__init__.py @@ -2,12 +2,15 @@ beartype_this_package() -from .files_variant import FileClient, FileServer # noqa: F401, E402 -from .simple_client_server import ( # noqa: F401, E402 +from .client_server import ( # noqa: F401, E402 + COMMANDS_MAPPING, USER_FUNCTION, + Client, CollegamentoError, Request, + RequestQueueType, Response, - SimpleClient, - SimpleServer, + ResponseQueueType, + Server, ) +from .files_variant import FileClient, FileServer # noqa: F401, E402 diff --git a/collegamento/client_server/__init__.py b/collegamento/client_server/__init__.py new file mode 100644 index 0000000..8e4498d --- /dev/null +++ b/collegamento/client_server/__init__.py @@ -0,0 +1,10 @@ +from .client import Client, Server # noqa: F401, E402 +from .utils import ( # noqa: F401, E402 + COMMANDS_MAPPING, + USER_FUNCTION, + CollegamentoError, + Request, + RequestQueueType, + Response, + ResponseQueueType, +) diff --git a/collegamento/client_server/client.py b/collegamento/client_server/client.py new file mode 100644 index 0000000..67b28b4 --- /dev/null +++ b/collegamento/client_server/client.py @@ -0,0 +1,226 @@ +"""Defines the Client and Server class which provides a convenient and easy to use IPC interface. + +>>> def test(*args): +... return +>>> c = Client({"test": (test, True)}) +>>> c.request("test") +>>> c.request("test") +>>> from time import sleep +>>> sleep(1) +>>> res = c.get_response("test") +>>> res +[{...}, {...}] +>>> assert len(res) == 2 +>>> c.kill_IPC() +""" + +from multiprocessing import Process, Queue, freeze_support +from random import randint + +from .server import Server +from .utils import ( + COMMANDS_MAPPING, + USER_FUNCTION, + CollegamentoError, + Request, + RequestQueueType, + Response, + ResponseQueueType, +) + + +class Client: + """An easy to use implementation for IPC in Python. + + The Client class is used to talk to the Server class and run commands as directed. + + The public API includes the following methods: + - Client.add_command(name: str, command: USER_FUNCTION, multiple_requests: bool = False) + - Client.request(command: str, **kwargs) -> None | int + - Client.get_response(command: str) -> Response | list[Response] | None + - Client.kill_IPC() + """ + + def __init__( + self, + commands: COMMANDS_MAPPING = {}, + id_max: int = 15_000, + server_type: type = Server, + ) -> None: + """See tests, examples, and the file variant code to see how to give input. + + The most common input is commands and id_max. server_type is only really useful for wrappers.""" + + self.all_ids: list[int] = [] + self.id_max: int = id_max + + # int corresponds to str and str to int = int -> str & str -> int + self.current_ids: dict[str | int, int | str] = {} + + self.newest_responses: dict[str, list[Response]] = {} + self.server_type: type = server_type + + self.commands: dict[str, tuple[USER_FUNCTION, bool]] = {} + + for command, func in commands.items(): + # We don't check types or length because beartype takes care of that for us + if not isinstance(func, tuple): + self.commands[command] = (func, False) + self.newest_responses[command] = [] + continue + + self.commands[command] = func + self.newest_responses[command] = [] + + self.request_queue: RequestQueueType + self.response_queue: ResponseQueueType + self.main_process: Process + self.create_server() + + def create_server(self): + """Creates a Server and terminates the old one if it exists - internal API""" + freeze_support() + + if hasattr(self, "main_process"): + # If the old Process didn't finish instatiation we need to terminate the Process + # so it doesn't try to access queues that no longer exist + self.main_process.terminate() + + self.check_responses() + self.all_ids = [] # The remaining ones will never have been finished + + self.request_queue = Queue() + self.response_queue = Queue() + self.main_process = Process( + target=self.server_type, + args=( + self.commands, + self.request_queue, + self.response_queue, + ), + daemon=True, + ) + self.main_process.start() + + def create_message_id(self) -> int: + """Creates a Message id - internal API""" + + # In cases where there are many many requests being sent it may be faster to choose a + # random id than to iterate through the list of id's and find an unclaimed one + # NOTE: 0 is reserved for when there's no curent id's (self.current_ids) + id: int = randint(1, self.id_max) + while id in self.all_ids: + id = randint(1, self.id_max) + self.all_ids.append(id) + + if not self.main_process.is_alive(): + # No point in an id if the server's dead + self.create_server() + + return id + + def request(self, command: str, **kwargs) -> None: + """Sends the main process a request of type command with given kwargs - external API""" + + if command not in self.commands: + raise CollegamentoError( + f"Command {command} not in builtin commands. Those are {self.commands}!" + ) + + id: int = self.create_message_id() + + final_request: Request = { + "id": id, + "type": "request", + "command": command, + } + final_request.update(**kwargs) + + if self.commands[command][1]: + self.current_ids[id] = command + + self.current_ids[command] = id + + self.request_queue.put(final_request) + + def parse_response(self, res: Response) -> None: + """Parses main process output and discards useless responses - internal API""" + id: int = res["id"] + self.all_ids.remove(id) + + if "command" not in res: + return + + command: str = res["command"] + + if command == "add-command": + return + + self.newest_responses[command].append(res) + self.current_ids[command] = 0 + + if self.commands[command][1]: + self.current_ids.pop(id) + return + + if id != self.current_ids[command]: + return + + def check_responses(self) -> None: + """Checks all main process output by calling parse_line() on each response - internal API""" + while not self.response_queue.empty(): + self.parse_response(self.response_queue.get()) + + def get_response(self, command: str) -> Response | list[Response] | None: + """Checks responses and returns the current response of type command if it has been returned - external API""" + if command not in self.commands: + raise CollegamentoError( + f"Cannot get response of command {command}, valid commands are {self.commands}" + ) + + self.check_responses() + response: list[Response] = self.newest_responses[command] + self.newest_responses[command] = [] + if not len(response): + return None + + # If we know that the command doesn't allow multiple requests don't give a list + if not self.commands[command][1]: + return response[0] # Will only ever be one + + return response + + def add_command( + self, + name: str, + command: USER_FUNCTION, + multiple_requests: bool = False, + ) -> None: + if name == "add-command": + raise CollegamentoError( + "Cannot add command add-command as it is a special builtin" + ) + + id: int = self.create_message_id() + final_request: Request = { + "id": id, + "type": "request", + "command": "add-command", + } + command_tuple: tuple[USER_FUNCTION, bool] = ( + command, + multiple_requests, + ) + final_request.update(**{"name": name, "function": command_tuple}) + + self.request_queue.put(final_request) + self.commands[name] = command_tuple + self.newest_responses[name] = [] + + def kill_IPC(self): + """Kills the internal Process and frees up some storage and CPU that may have been used otherwise - external API""" + self.main_process.terminate() + + def __del__(self): + # Multiprocessing bugs arise if the Process is created, not saved, and not terminated + self.main_process.terminate() diff --git a/collegamento/client_server/server.py b/collegamento/client_server/server.py new file mode 100644 index 0000000..a7bff29 --- /dev/null +++ b/collegamento/client_server/server.py @@ -0,0 +1,158 @@ +"""Defines the Server class which is the butter to the bread that is the Client.""" + +from time import sleep + +from beartype.typing import Any + +from .utils import ( + USER_FUNCTION, + Request, + RequestQueueType, + Response, + ResponseQueueType, +) + + +def command_sort_func( + request: Request, priority_commands: list[str] +) -> tuple[bool, int]: + command_index: int = 0 + in_priority_commands: bool = request["command"] in priority_commands + + if in_priority_commands: + command_index = priority_commands.index(request["command"]) + + return ( + not in_priority_commands, + command_index, + ) # It it sorts False before True + + +class Server: + """A basic and multipurpose server that can be easily subclassed for your specific needs.""" + + def __init__( + self, + commands: dict[str, tuple[USER_FUNCTION, bool]], + requests_queue: RequestQueueType, + response_queue: ResponseQueueType, + priority_commands: list[str] = [], # Only used by subclasses + ) -> None: + self.response_queue: ResponseQueueType = response_queue + self.requests_queue: RequestQueueType = requests_queue + self.all_ids: list[int] = [] + self.newest_ids: dict[str, list[int]] = {} + self.newest_requests: dict[str, list[Request]] = {} + self.priority_commands: list[str] = priority_commands + + self.commands: dict[str, tuple[USER_FUNCTION, bool]] = commands + for command, func_tuple in self.commands.items(): + self.newest_ids[command] = [] + self.newest_requests[command] = [] + + while True: + self.run_tasks() + sleep(0.0025) + + def simple_id_response(self, id: int, cancelled: bool = True) -> None: + response: Response = { + "id": id, + "type": "response", + "cancelled": cancelled, + } + self.response_queue.put(response) + + def parse_line(self, message: Request) -> None: + id: int = message["id"] + + if message["type"] != "request": + self.simple_id_response(id) + return + + command: str = message["command"] + + if command == "add-command": + request_name: str = message["name"] # type: ignore + + request_tuple: tuple[USER_FUNCTION, bool] = message["function"] # type: ignore + self.commands[request_name] = request_tuple + self.newest_requests[request_name] = [] + self.newest_ids[request_name] = [] + self.simple_id_response(id) + return + + self.all_ids.append(id) + + if not self.commands[command][1]: + self.newest_ids[command] = [] + self.newest_requests[command] = [] + + self.newest_ids[command].append(id) + self.newest_requests[command].append(message) + + def cancel_old_ids(self) -> None: + accepted_ids: list[int] = [ + request["id"] + for request_list in list(self.newest_requests.values()) + for request in request_list + ] + + for request in self.all_ids: + if request in accepted_ids: + continue + + self.simple_id_response(request) + + self.all_ids = [] + + def handle_request(self, request: Request) -> None: + command: str = request["command"] + id: int = request["id"] + result: Any # noqa: F842 + + command = request["command"] + response: Response = { + "id": id, + "type": "response", + "cancelled": False, + "command": command, + } + + if command not in self.commands: + response["result"] = None + response["cancelled"] = True + else: + response["result"] = self.commands[command][0](self, request) + + self.response_queue.put(response) + self.newest_ids[command].remove(id) + + def run_tasks(self) -> None: + if self.requests_queue.empty(): + return + + while not self.requests_queue.empty(): + self.parse_line(self.requests_queue.get()) + + self.cancel_old_ids() + + requests_list: list[Request] = [ + request + for request_list in self.newest_requests.values() + if request_list + for request in request_list + ] + + requests_list = sorted( + requests_list, + key=lambda request: command_sort_func( + request, self.priority_commands + ), + ) + + for request in requests_list: + if request is None: + continue + command: str = request["command"] + self.handle_request(request) + self.newest_requests[command].remove(request) diff --git a/collegamento/simple_client_server/misc.py b/collegamento/client_server/utils.py similarity index 69% rename from collegamento/simple_client_server/misc.py rename to collegamento/client_server/utils.py index 6f5e7d4..c183385 100644 --- a/collegamento/simple_client_server/misc.py +++ b/collegamento/client_server/utils.py @@ -4,11 +4,12 @@ from beartype.typing import Callable +# TODO: Move away from TypedDict class Message(TypedDict): """Base class for messages in and out of the server""" id: int - type: str # Can be "request", "response", "notification" + type: str # Can be "request" or "response" class Request(Message): @@ -25,16 +26,21 @@ class Response(Message): result: NotRequired[Any] -USER_FUNCTION = Callable[["SimpleServer", Request], Any] # type: ignore +class CollegamentoError(Exception): ... # I don't like the boilerplate either + + +USER_FUNCTION = Callable[["Server", Request], Any] # type: ignore +COMMANDS_MAPPING = dict[ + str, USER_FUNCTION | tuple[USER_FUNCTION, bool] +] # if bool is true the command allows multiple requests + if TYPE_CHECKING: ResponseQueueType = GenericQueueClass[Response] RequestQueueType = GenericQueueClass[Request] -# Else, this is CPython < 3.12. We are now in the No Man's Land -# of Typing. In this case, avoid subscripting "GenericQueue". Ugh. +# "Else this is CPython < 3.12. We are now in the No Man's Land +# of Typing. In this case, avoid subscripting "GenericQueue". Ugh." +# - @leycec, maintainer of the amazing @beartype else: ResponseQueueType = GenericQueueClass RequestQueueType = GenericQueueClass - - -class CollegamentoError(Exception): ... # I don't like the boilerplate either diff --git a/collegamento/files_variant.py b/collegamento/files_variant.py index 5e81417..835cff2 100644 --- a/collegamento/files_variant.py +++ b/collegamento/files_variant.py @@ -1,13 +1,14 @@ -from logging import Logger -from multiprocessing.queues import Queue as GenericQueueClass from typing import NotRequired -from .simple_client_server import ( +from .client_server import ( + COMMANDS_MAPPING, USER_FUNCTION, + Client, CollegamentoError, Request, - SimpleClient, - SimpleServer, + RequestQueueType, + ResponseQueueType, + Server, ) @@ -20,120 +21,87 @@ def update_files(server: "FileServer", request: Request) -> None: file: str = request["file"] # type: ignore if request["remove"]: # type: ignore - server.logger.info(f"File {file} was requested for removal") server.files.pop(file) - server.logger.info(f"File {file} has been removed") - else: - contents: str = request["contents"] # type: ignore - server.files[file] = contents - server.logger.info(f"File {file} has been updated with new contents") + return + contents: str = request["contents"] # type: ignore + server.files[file] = contents -class FileClient(SimpleClient): + +class FileClient(Client): """File handling variant of SImpleClient. Extra methods: - FileClient.update_file() - FileClient.remove_file() """ def __init__( - self, commands: dict[str, USER_FUNCTION], id_max: int = 15_000 + self, commands: COMMANDS_MAPPING, id_max: int = 15_000 ) -> None: self.files: dict[str, str] = {} - commands["FileNotification"] = update_files + commands["FileNotification"] = (update_files, True) super().__init__(commands, id_max, FileServer) - self.priority_commands = ["FileNotification"] - def create_server(self) -> None: """Creates the main_server through a subprocess - internal API""" super().create_server() - self.logger.info("Copying files to server") - files_copy = self.files.copy() - self.files = {} - for file, data in files_copy.items(): + for file, data in self.files.items(): self.update_file(file, data) - self.logger.debug("Finished copying files to server") - def request( - self, - request_details: dict, - ) -> None: - if "file" in request_details: - file = request_details["file"] - if file not in self.files: - self.logger.exception( - f"File {file} not in files! Files are {self.files.keys()}" - ) - raise Exception( - f"File {file} not in files! Files are {self.files.keys()}" - ) - - super().request(request_details) + def request(self, command: str, **kwargs) -> None: + file: str | None = kwargs.get("file") + if file and file not in self.files: + raise CollegamentoError( + f"File {file} not in files! Files are {self.files.keys()}" + ) + + super().request(command, **kwargs) def update_file(self, file: str, current_state: str) -> None: """Updates files in the system - external API""" - self.logger.info(f"Updating file: {file}") self.files[file] = current_state - self.logger.debug("Creating notification dict") - file_notification: dict = { - "command": "FileNotification", - "file": file, - "remove": False, - "contents": self.files[file], - } - - self.logger.debug("Notifying server of file update") - super().request(file_notification) + super().request( + "FileNotification", + file=file, + remove=False, + contents=self.files[file], + ) def remove_file(self, file: str) -> None: """Removes a file from the main_server - external API""" if file not in list(self.files.keys()): - self.logger.exception( - f"Cannot remove file {file} as file is not in file database!" - ) raise CollegamentoError( f"Cannot remove file {file} as file is not in file database!" ) - self.logger.info("Notifying server of file deletion") - file_notification: dict = { - "command": "FileNotification", - "file": file, - "remove": True, - } - self.logger.debug("Notifying server of file removal") - super().request(file_notification) + super().request("FileNotification", file=file, remove=True) -class FileServer(SimpleServer): +class FileServer(Server): """File handling variant of SimpleServer""" def __init__( self, - commands: dict[str, USER_FUNCTION], - response_queue: GenericQueueClass, - requests_queue: GenericQueueClass, - logger: Logger, + commands: dict[str, tuple[USER_FUNCTION, bool]], + requests_queue: RequestQueueType, + response_queue: ResponseQueueType, ) -> None: self.files: dict[str, str] = {} super().__init__( commands, - response_queue, requests_queue, - logger, + response_queue, ["FileNotification"], ) def handle_request(self, request: Request) -> None: if "file" in request and request["command"] != "FileNotification": - file = request["file"] - request["file"] = self.files[file] + request["file"] = self.files[request["file"]] super().handle_request(request) diff --git a/collegamento/simple_client_server/__init__.py b/collegamento/simple_client_server/__init__.py deleted file mode 100644 index e49df62..0000000 --- a/collegamento/simple_client_server/__init__.py +++ /dev/null @@ -1,9 +0,0 @@ -from .client import SimpleClient, SimpleServer # noqa: F401, E402 -from .misc import ( # noqa: F401, E402 - USER_FUNCTION, - CollegamentoError, - Request, - RequestQueueType, - Response, - ResponseQueueType, -) diff --git a/collegamento/simple_client_server/client.py b/collegamento/simple_client_server/client.py deleted file mode 100644 index e336f42..0000000 --- a/collegamento/simple_client_server/client.py +++ /dev/null @@ -1,189 +0,0 @@ -from logging import Logger, getLogger -from multiprocessing import Process, Queue, freeze_support -from random import randint - -from .misc import ( - USER_FUNCTION, - CollegamentoError, - Request, - RequestQueueType, - Response, - ResponseQueueType, -) -from .server import SimpleServer - - -class SimpleClient: - """The IPC class is used to talk to the server and run commands. The public API includes the following methods: - - SimpleClient.request() - - SimpleClient.add_command() - - SimpleClient.kill_IPC() - """ - - def __init__( - self, - commands: dict[str, USER_FUNCTION], - id_max: int = 15_000, - server_type: type = SimpleServer, - ) -> None: - self.all_ids: list[int] = [] - self.id_max = id_max - self.current_ids: dict[str, int] = {} - self.newest_responses: dict[str, Response | None] = {} - self.server_type: type[SimpleServer] = server_type - - self.commands = commands - for command in self.commands: - self.current_ids[command] = 0 - self.newest_responses[command] = None - - self.logger: Logger = getLogger("IPC") - self.logger.info("Creating server") - self.response_queue: ResponseQueueType = Queue() - self.requests_queue: RequestQueueType = Queue() - self.main_server: Process - self.create_server() - self.logger.info("Initialization is complete") - - def create_server(self) -> None: - """Creates the main_server through a subprocess - internal API""" - freeze_support() - server_logger = getLogger("Server") - self.main_server = Process( - target=self.server_type, - args=( - self.commands, - self.response_queue, - self.requests_queue, - server_logger, - ), - daemon=True, - ) - self.main_server.start() - self.logger.info("Server created") - - def create_message_id(self) -> int: - """Creates a Message based on the args and kwawrgs provided. Highly flexible. - internal API""" - self.logger.info("Creating message for server") - id = randint(1, self.id_max) # 0 is reserved for the empty case - while id in self.all_ids: - id = randint(1, self.id_max) - self.all_ids.append(id) - - self.logger.debug("ID for message created") - - if not self.main_server.is_alive(): - # No point in an id if the server's dead - self.logger.critical( - "Server was killed at some point, creating server" - ) - self.create_server() - - return id - - def request( - self, - request_details: dict, - ) -> None: - """Sends the main_server a request of type command with given kwargs - external API""" - self.logger.debug("Beginning request") - - # NOTE: this variable could've been a standalone line but I thought it would just be better - # to use the walrus operator. No point in a language feature if its never used. Plus, - # it also looks quite nice :D - if (command := request_details["command"]) not in self.commands: - self.logger.exception( - f"Command {command} not in builtin commands. Those are {self.commands}!" - ) - raise CollegamentoError( - f"Command {command} not in builtin commands. Those are {self.commands}!" - ) - - self.logger.info("Creating request for server") - - id: int = self.create_message_id() - - self.current_ids[command] = id - final_request: Request = { - "id": id, - "type": "request", - "command": command, - } - final_request.update(request_details) # type: ignore - self.logger.debug(f"Request created: {final_request}") - - self.requests_queue.put(final_request) - self.logger.info("Message sent") - - def parse_response(self, res: Response) -> None: - """Parses main_server output line and discards useless responses - internal API""" - self.logger.debug("Parsing server response") - id = res["id"] - self.all_ids.remove(id) - - if "command" not in res: - self.logger.info("Response was notification response") - return - - command = res["command"] - - if command == "add-command": - return - - if id != self.current_ids[command]: - self.logger.info("Response is from old request") - return - - self.logger.info(f"Response is useful for command type: {command}") - self.current_ids[command] = 0 - self.newest_responses[command] = res - - def check_responses(self) -> None: - """Checks all main_server output by calling IPC.parse_line() on each response - internal API""" - self.logger.debug("Checking responses") - while not self.response_queue.empty(): - self.parse_response(self.response_queue.get()) - - def get_response(self, command: str) -> Response | None: - """Checks responses and returns the current response of type command if it has been returned - external API""" - self.logger.info(f"Getting response for type: {command}") - if command not in self.commands: - self.logger.exception( - f"Cannot get response of command {command}, valid commands are {self.commands}" - ) - raise CollegamentoError( - f"Cannot get response of command {command}, valid commands are {self.commands}" - ) - - self.check_responses() - response: Response | None = self.newest_responses[command] - self.newest_responses[command] = None - self.logger.info("Response retrieved") - return response - - def add_command(self, name: str, command: USER_FUNCTION) -> None: - if name == "add-command": - self.logger.exception( - "Cannot add command add-command as it is a special builtin" - ) - raise CollegamentoError( - "Cannot add command add-command as it is a special builtin" - ) - - id: int = self.create_message_id() - final_request: Request = { - "id": id, - "type": "request", - "command": "add-command", - } - final_request.update({"name": name, "function": command}) # type: ignore - self.logger.debug(f"Add Command Request created: {final_request}") - - self.requests_queue.put(final_request) - self.logger.info("Message sent") - self.commands[name] = command - - def kill_IPC(self) -> None: - """Kills the main_server when salve_ipc's services are no longer required - external API""" - self.logger.info("Killing server") - self.main_server.kill() diff --git a/collegamento/simple_client_server/server.py b/collegamento/simple_client_server/server.py deleted file mode 100644 index 718df1a..0000000 --- a/collegamento/simple_client_server/server.py +++ /dev/null @@ -1,170 +0,0 @@ -from logging import Logger -from multiprocessing.queues import Queue as GenericQueueClass -from time import sleep -from typing import Any - -from .misc import ( - USER_FUNCTION, - Request, - RequestQueueType, - Response, - ResponseQueueType, -) - - -class SimpleServer: - """Handles input from the user and returns output from special functions. Not an external API.""" - - def __init__( - self, - commands: dict[str, USER_FUNCTION], - response_queue: GenericQueueClass, - requests_queue: GenericQueueClass, - logger: Logger, - priority_commands: list[str] = [], - ) -> None: - self.logger: Logger = logger - self.logger.info("Starting server setup") - - self.response_queue: ResponseQueueType = response_queue - self.requests_queue: RequestQueueType = requests_queue - self.all_ids: list[int] = [] - self.newest_ids: dict[str, int] = {} - self.newest_requests: dict[str, Request | None] = {} - self.priority_commands: list[str] = priority_commands - - self.commands: dict[str, USER_FUNCTION] = commands - for command in self.commands: - self.newest_ids[command] = 0 - self.newest_requests[command] = None - - self.logger.info("Server setup complete") - - while True: - self.run_tasks() - sleep(0.0025) - - def simple_id_response(self, id: int, cancelled: bool = True) -> None: - self.logger.debug(f"Creating simple response for id {id}") - response: Response = { - "id": id, - "type": "response", - "cancelled": cancelled, - } - self.logger.debug(f"Sending simple response for id {id}") - self.response_queue.put(response) - self.logger.info(f"Simple response for id {id} sent") - - def parse_line(self, message: Request) -> None: - self.logger.debug("Parsing Message from user") - id: int = message["id"] - - if message["type"] != "request": - self.logger.warning( - f"Unknown type {type}. Sending simple response" - ) - self.simple_id_response(id) - self.logger.debug(f"Simple response for id {id} sent") - return - - self.logger.info(f"Mesage with id {id} is of type request") - self.all_ids.append(id) - command: str = message["command"] # type: ignore - self.newest_ids[command] = id - self.newest_requests[command] = message # type: ignore - self.logger.debug("Request stored for parsing") - - def cancel_all_ids_except_newest(self) -> None: - self.logger.info("Cancelling all old id's") - - # NOTE: It used to be list comprehension but that was ugly - ids = [] - for request in list(self.newest_requests.values()): - if request is not None: - ids.append(request["id"]) - - for request in self.all_ids: - if request in ids: - self.logger.debug(f"Id {request} is newest of its command") - continue - - self.logger.debug( - f"Id {request} is an old request, sending simple respone" - ) - self.simple_id_response(request) - - self.all_ids = [] - self.logger.debug("All ids list reset") - - def handle_request(self, request: Request) -> None: - command: str = request["command"] - id: int = self.newest_ids[command] - result: Any # noqa: F842 - - command = request["command"] - response: Response = { - "id": id, - "type": "response", - "cancelled": False, - "command": command, - } - - if command == "add-command": - self.commands[request["name"]] = request["function"] # type: ignore - response["result"] = None - response["cancelled"] = True - self.logger.debug("Response created") - self.response_queue.put(response) - self.newest_ids[command] = 0 - self.logger.info(f"Response sent for request of command {command}") - return - - if command not in self.commands: - self.logger.warning(f"Command {command} not recognized") - response["result"] = None - response["cancelled"] = True - else: - self.logger.debug(f"Running user function for command {command}") - response["result"] = self.commands[command](self, request) - - self.logger.debug("Response created") - self.response_queue.put(response) - self.newest_ids[command] = 0 - self.logger.info(f"Response sent for request of command {command}") - - def run_tasks(self) -> None: - if self.requests_queue.empty(): - return - - self.logger.debug("New request in queue") - while not self.requests_queue.empty(): - self.logger.debug("Parsing request") - self.parse_line(self.requests_queue.get()) - - if not self.all_ids: - self.logger.debug("All requests were notifications") - - self.logger.debug("Cancelling all old id's") - self.cancel_all_ids_except_newest() - - # Actual work - requests_list: list[Request] = [ - request - for request in self.newest_requests.values() - if request is not None - ] - requests_list = sorted( - requests_list, - key=lambda request: ( - request["command"] not in self.priority_commands, - ), - ) - - for request in requests_list: - if request is None: - continue - command: str = request["command"] - self.logger.info(f"Handling request of command {command}") - self.handle_request(request) - self.newest_requests[command] = None - self.logger.debug("Request completed") diff --git a/docs/source/classes.rst b/docs/source/classes.rst index 40bde22..9805fa7 100644 --- a/docs/source/classes.rst +++ b/docs/source/classes.rst @@ -14,32 +14,44 @@ The ``CollegamentoError`` class is a simple error class for ``Collegamento``. ``Request`` *********** -The ``Request`` class is a TypedDict meant to provide a framework for items given to functions used by the IPC. It *will* almsot always contain extra items regardless of the fact that that's not supposed to happen (just add ``# type: ignore`` to the end of the line to shut up the langserver). The data provided will not be typed checked to make sure its proper. The responsibility of data rests on the user. +The ``Request`` class is a TypedDict meant to provide a framework for items given to functions used by the IPC. It *will* almsot always contain extra items regardless of the fact that they aren't outlined in the ``TypedDict`` (just add ``# type: ignore`` to the end of the line to shut up the langserver). The data provided will not be typed checked to make sure its proper. The responsibility of data rests on the user. .. _Response Overview: ``Response`` ************ -The ``Response`` class is what is returned by the "ref:`SimpleClient Overview` or one of it's variants to the user. The useful data is found at ``some_response["result"]``. +The ``Response`` class is what is returned by the "ref:`Client Overview` or one of it's variants to the user. The useful data is found at ``some_response["result"]``. -.. _SimpleClient Overview: +.. _Client Overview: -``SimpleClient`` +``Client`` **************** -The ``SimpleClient`` class can do: +The ``Client`` class can do: -- ``SimpleClient.request(request_details: dict)`` (all details in request_details are specific to the command in the request_details) -- ``SimpleClient.add_command(name: str, command: USER_FUNCTION)`` (adds the function with the name provided that takes input of :ref:`Request Overview` and returns anything`` -- ``SimpleClient.kill_IPC()`` (kills the IPC server) +- ``Client.request(command: str, **kwargs)`` +- ``Client.add_command(name: str, command: USER_FUNCTION, multiple_requests: bool = False)`` (adds the function with the name provided that takes input of :ref:`Request Overview` and returns anything) +- ``Client.get_response(command: str) -> Response | list[Response] | None`` (returns a list of ``Response``'s if the command allows multiple requests otherwise a single ``Response`` if there is were any responses ohterwise ``None``) +- ``Client.kill_IPC()`` (kills the IPC server) -.. _SimpleServer Overview: +When using the ``Client`` class you give the commands as a dict. Below are the ways it can be specified: + - ``{"foo": foo}`` (this means the command foo can only take the newest request given + - ``{"foo": (foo, False)}`` (this means the command foo can only take the newest request given + - ``{"foo": (foo, True)}`` (this means the command foo can take all requests given (new or old) -``SimpleServer`` +By default ``Collegamento`` assumes you only want the newest request but chooses to still give the option to make multiple requests. For ``.get_response()`` the output changes based on how this was specified by giving ``None`` if there was no response, ``Response`` if the command only allows the newest request, and ``list[Response]`` if it allows multiple regardless of how many times you made a request for it. + +Note that because of the way that the commands are handed to the ``Server`` and run, they can actually modify its attributes and theoretically even the functions the ``Server`` runs. This high flexibility also requires the user to ensure that they properly manage any attributes they mess with. + +When it comes to requesting the server to run a command, you give the command as the first argument and all subsequent args for the function the ``Server`` calls are given as kwargs that are passed on. + +.. _Server Overview: + +``Server`` **************** -The SimpleServer is a backend piece of code made visible for commands that can be given to a ``SimpleClient``. If you want to know more about it, check out the source code ;). +The ``Server`` is a backend piece of code made visible for commands that can be given to a ``Client``. If you want to know more about it, check out the source code ;). The reason it is mentioned is because it is given as an argument to :ref:`USER_FUNCTION Overview`'s and we choose to let type declarations exist. .. _FileClient Overview: @@ -51,11 +63,25 @@ The SimpleServer is a backend piece of code made visible for commands that can b - ``FileClient.update_file(file: str, current_state: str)`` (adds or updates the file with the new contents and notifies server of changes) - ``FileClient.remove_file(file: str)`` (removes the file specified from the system and notifies the server to fo the same) -This class also has some changed functionality. When you make a ``.request()`` and add a file to the request, it chnages the request's name to its contents for the function to use. +This class also has some changed functionality. When you make a ``.request()`` and add a file to the request, it changes the request's file name to its contents for the function to use. This isn't technically necessary as the function called can access the files in the Server and modify them as it pleases since it has full access to all the Server's resources. .. _FileServer Overview: ``FileServer`` ************** -The ``FileServer`` is a backend piece of code made visible for commands that can be given to a ``FileClient``. If you want to know more about it, check out the source code ;). +The ``FileServer`` is a backend piece of code made visible for commands that can be given to a ``FileClient``. See my explanation on :ref:`Server Overview` + +.. _RequestQueueType Overview: + +``RequestQueueType`` +********************* + +A subtype hint for `multiprocessing.Queue[Request]` that is meant to be used for the ``Request`` class that defaults to `multiprocessing.Queue` if the python version is less than 3.12. + +.. _ResponseQueueType Overview: + +``ResponseQueueType`` +********************** + +A subtype hint for `multiprocessing.Queue[Response]` that is meant to be used for the ``Response`` class that defaults to `multiprocessing.Queue` if the python version is less than 3.12. diff --git a/docs/source/conf.py b/docs/source/conf.py index 42b4788..e6f1017 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -14,7 +14,7 @@ project = "collegamento" copyright = "2024, Moosems" author = "Moosems" -release = "v0.2.0" +release = "v0.3.0-rc0" # -- General configuration --------------------------------------------------- # https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration diff --git a/docs/source/example-usage.rst b/docs/source/example-usage.rst index a3096ff..a70cac6 100644 --- a/docs/source/example-usage.rst +++ b/docs/source/example-usage.rst @@ -8,28 +8,29 @@ Now that you have ``Collegamento`` installed, let's try running a simple example from time import sleep - from collegamento import USER_FUNCTION, Request, Response, SimpleClient, SimpleServer + from collegamento import Client, Request, Response, Server - def foo(server: "SimpleServer", bar: Request) -> bool: + def foo(server: "Server", bar: Request) -> bool: if bar["command"] == "test": return True return False def main(): - commands: dict[str, USER_FUNCTION] = {"test": foo} - context = SimpleClient(commands) + # As of collegamento v0.3.0 you can allow multiple requests for the same command + # like so: {"test": (foo, True)} (using (foo, False)) is the default (only newest request) + context = Client({"test": foo}) - context.request({"command": "test"}) + context.request("test") sleep(1) - output: Response | None = context.get_response("test") - if output is not None and output["result"]: # type: ignore + output: Response = context.get_response("test") # type: ignore + if output and output[0]["result"]: # type: ignore print("Yippee! It worked!") else: - print("Aww, maybe your compute is just a little slow?") + print("Aww, maybe your computer is just a little slow?") context.kill_IPC() diff --git a/docs/source/examples/class_example.rst b/docs/source/examples/class_example.rst index b4798bc..ba3d5ba 100644 --- a/docs/source/examples/class_example.rst +++ b/docs/source/examples/class_example.rst @@ -19,7 +19,7 @@ Class Example self.context.update_file("user_file", new_contents) def request_split(self) -> None: - self.context.request({"command": "MyClientFunc", "file": "user_file"}) + self.context.request("MyClientFunc", file="user_file") def check_split(self) -> list[str] | None: output = self.context.get_response("MyClientFunc") diff --git a/docs/source/examples/file_example.rst b/docs/source/examples/file_example.rst index 17f2720..a4e1154 100644 --- a/docs/source/examples/file_example.rst +++ b/docs/source/examples/file_example.rst @@ -6,13 +6,7 @@ File Example from time import sleep - from collegamento import ( - USER_FUNCTION, - FileClient, - FileServer, - Request, - Response, - ) + from collegamento import FileClient, FileServer, Request, Response def split_str(server: "FileServer", arg: Request) -> list[str]: @@ -21,16 +15,15 @@ File Example def main(): - commands: dict[str, USER_FUNCTION] = {"test": split_str} - context = FileClient(commands) + context = FileClient({"test": split_str}) context.update_file("test", "test contents") sleep(1) - context.request({"command": "test", "file": "test"}) + context.request("test", file="test") sleep(1) - output: Response | None = context.get_response("test") + output: Response = context.get_response("test") # type: ignore print(output) context.kill_IPC() diff --git a/docs/source/examples/simple_example.rst b/docs/source/examples/simple_example.rst index 6451b6e..ff52578 100644 --- a/docs/source/examples/simple_example.rst +++ b/docs/source/examples/simple_example.rst @@ -4,32 +4,31 @@ Simple Example .. code-block:: python - from collegamento import ( - USER_FUNCTION, - Request, - Response, - SimpleClient, - SimpleServer, - ) + from time import sleep + from collegamento import Client, Request, Response, Server - def foo(server: "SimpleServer", bar: Request) -> bool: + + def foo(server: "Server", bar: Request) -> bool: if bar["command"] == "test": return True return False def main(): - commands: dict[str, USER_FUNCTION] = {"test": foo} - context = SimpleClient(commands) + # As of collegamento v0.3.0 you can allow multiple requests for the same command + # like so: {"test": (foo, True)} (using (foo, False)) is the default (only newest request) + context = Client({"test": foo}) + + context.request("test") - context.request({"command": "test"}) + sleep(1) - output: Response | None = context.get_response("test") - if output is not None and output["result"]: # type: ignore + output: Response = context.get_response("test") # type: ignore + if output and output[0]["result"]: # type: ignore print("Yippee! It worked!") else: - print("Aww, maybe your compute is just a little slow?") + print("Aww, maybe your computer is just a little slow?") context.kill_IPC() diff --git a/docs/source/variables.rst b/docs/source/variables.rst index 8718369..b47f705 100644 --- a/docs/source/variables.rst +++ b/docs/source/variables.rst @@ -7,4 +7,11 @@ Variables ``USER_FUNCTION`` ***************** -``USER_FUNCTION`` is a type variable that simply states that any function that matches this type takes in a :ref:`SimpleServer Overview` and :ref:`Request Overview` class (positionally) and returns anything or even nothing. +``USER_FUNCTION`` is a type variable that simply states that any function that matches this type takes in a :ref:`Server Overview` and :ref:`Request Overview` class (positionally) and can return anything it pleases (though this will never be used). + +.. _COMMANDS_MAPPING Overview: + +``COMMANDS_MAPPING`` +******************** + +``COMMANDS_MAPPING`` is a type variable that states that any dictionary that matches this type has keys that are strings and values that are either functions that match the :ref:`USER_FUNCTION Overview` type or a tuple with that function and a boolean that indicates whether the function can have multiple :ref:``Request Overview``'s. diff --git a/examples/class_example.py b/examples/class_example.py index f0143ca..b2e7e04 100644 --- a/examples/class_example.py +++ b/examples/class_example.py @@ -13,7 +13,7 @@ def change_file(self, new_contents: str) -> None: self.context.update_file("user_file", new_contents) def request_split(self) -> None: - self.context.request({"command": "MyClientFunc", "file": "user_file"}) + self.context.request("MyClientFunc", file="user_file") def check_split(self) -> list[str] | None: output = self.context.get_response("MyClientFunc") diff --git a/examples/file_example.py b/examples/file_example.py index edf71b4..365503c 100644 --- a/examples/file_example.py +++ b/examples/file_example.py @@ -1,12 +1,6 @@ from time import sleep -from collegamento import ( - USER_FUNCTION, - FileClient, - FileServer, - Request, - Response, -) +from collegamento import FileClient, FileServer, Request, Response def split_str(server: "FileServer", arg: Request) -> list[str]: @@ -15,16 +9,15 @@ def split_str(server: "FileServer", arg: Request) -> list[str]: def main(): - commands: dict[str, USER_FUNCTION] = {"test": split_str} - context = FileClient(commands) + context = FileClient({"test": split_str}) context.update_file("test", "test contents") sleep(1) - context.request({"command": "test", "file": "test"}) + context.request("test", file="test") sleep(1) - output: Response | None = context.get_response("test") + output: Response = context.get_response("test") # type: ignore print(output) context.kill_IPC() diff --git a/examples/simple_example.py b/examples/simple_example.py index 9cb46e2..af18a18 100644 --- a/examples/simple_example.py +++ b/examples/simple_example.py @@ -1,29 +1,28 @@ -from collegamento import ( - USER_FUNCTION, - Request, - Response, - SimpleClient, - SimpleServer, -) +from time import sleep +from collegamento import Client, Request, Response, Server -def foo(server: "SimpleServer", bar: Request) -> bool: + +def foo(server: "Server", bar: Request) -> bool: if bar["command"] == "test": return True return False def main(): - commands: dict[str, USER_FUNCTION] = {"test": foo} - context = SimpleClient(commands) + # As of collegamento v0.3.0 you can allow multiple requests for the same command + # like so: {"test": (foo, True)} (using (foo, False)) is the default (only newest request) + context = Client({"test": foo}) + + context.request("test") - context.request({"command": "test"}) + sleep(1) - output: Response | None = context.get_response("test") - if output is not None and output["result"]: # type: ignore + output: Response = context.get_response("test") # type: ignore + if output and output[0]["result"]: # type: ignore print("Yippee! It worked!") else: - print("Aww, maybe your compute is just a little slow?") + print("Aww, maybe your computer is just a little slow?") context.kill_IPC() diff --git a/setup.py b/setup.py index f8a140c..f659c86 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ setup( name="collegamento", - version="0.2.0", + version="0.3.0-rc0", description="Collegamento provides an easy to use Client/Server IPC backend", author="Moosems", author_email="moosems.j@gmail.com", @@ -27,5 +27,5 @@ "License :: OSI Approved :: MIT License", "Typing :: Typed", ], - packages=["collegamento", "collegamento.simple_client_server"], + packages=["collegamento", "collegamento.client_server"], ) diff --git a/tests/test_file_variant.py b/tests/test_file_variant.py index df0a126..8b9f05f 100644 --- a/tests/test_file_variant.py +++ b/tests/test_file_variant.py @@ -1,12 +1,6 @@ from time import sleep -from collegamento import ( - USER_FUNCTION, - FileClient, - FileServer, - Request, - Response, -) +from collegamento import FileClient, FileServer, Request, Response def func(server: FileServer, request: Request) -> bool: @@ -19,26 +13,30 @@ def split_str(server: FileServer, arg: Request) -> list[str]: def test_file_variants(): - commands: dict[str, USER_FUNCTION] = {"test": func} - context = FileClient(commands) + context = FileClient({"test": func}) context.update_file("test", "test contents") - context.request({"command": "test"}) + context.update_file("test2", "test contents2") + sleep(1) + context.create_server() + context.request("test") sleep(1) - output: Response | None = context.get_response("test") + output: Response = context.get_response("test") # type: ignore assert output is not None # noqa: E711 assert output["result"] is True # noqa: E712 # type: ignore - context.add_command("test1", split_str) - context.request({"command": "test1", "file": "test"}) + context.add_command("test1", split_str, True) + context.request("test1", file="test") + context.request("test1", file="test2") sleep(1) - output: Response | None = context.get_response("test1") + output: list[Response] = context.get_response("test1") # type: ignore assert output is not None # noqa: E711 - assert output["result"] == ["test", "contents"] # noqa: E712 # type: ignore + assert output[0]["result"] == ["test", "contents"] # noqa: E712 # type: ignore + assert output[1]["result"] == ["test", "contents2"] # noqa: E712 # type: ignore assert context.all_ids == [] diff --git a/tests/test_simple.py b/tests/test_simple.py index fcafcb4..9b7dd52 100644 --- a/tests/test_simple.py +++ b/tests/test_simple.py @@ -1,42 +1,49 @@ from time import sleep -from collegamento import ( - USER_FUNCTION, - Request, - Response, - SimpleClient, - SimpleServer, -) +from collegamento import Client, Response -def foo(server: SimpleServer, bar: Request) -> bool: - if bar["command"] == "test": - return True - return False +def foo(server, request): + print("Foo called", request["id"]) -def test_Client_Server(): - commands: dict[str, USER_FUNCTION] = {"test": foo} - context = SimpleClient(commands) +def test_normal_client(): + Client({"foo": foo}) + x = Client({"foo": (foo, True), "foo2": foo}) - context.request({"command": "test"}) - context.add_command("test1", foo) + x.request("foo") + x.request("foo") + x.request("foo2") + x.request("foo2") # If you see six "Foo called"'s, thats bad news bears + x.add_command("foo3", foo) + x.request("foo3") + x.add_command("foo4", foo, True) + x.request("foo4") + x.request("foo4") sleep(1) - output: Response | None = context.get_response("test") + x.check_responses() # Not necessary, we're just checking that doing + # this first doesn't break get_response - assert output is not None # noqa: E711 - assert output["result"] == True # noqa: E712 # type: ignore + foo_r: list[Response] = x.get_response("foo") # type: ignore + foo_two_r: Response = x.get_response("foo2") # type: ignore + foo_three_r: Response = x.get_response("foo3") # type: ignore + foo_four_r: list[Response] = x.get_response("foo4") # type: ignore - context.request({"command": "test1"}) + assert len(foo_r) == 2 + assert foo_two_r + assert foo_three_r + assert len(foo_four_r) == 2 - sleep(1) + x.check_responses() + x.create_server() - output: Response | None = context.get_response("test1") - assert output is not None # noqa: E711 - assert output["result"] == False # noqa: E712 # type: ignore + assert x.all_ids == [] - assert context.all_ids == [] + Client() + Client({"foo": foo}).request("foo") + Client().kill_IPC() + Client().create_server() - context.kill_IPC() + sleep(1)