From 3c7b60b3bc0cccb9639b5e8d3322cd357633f4b9 Mon Sep 17 00:00:00 2001 From: Brianna Major Date: Tue, 19 Dec 2023 08:44:29 -0500 Subject: [PATCH] DOC: Add docstrings to CellWatcher class --- itkwidgets/cell_watcher.py | 170 ++++++++++++++++++++++++++++--------- 1 file changed, 131 insertions(+), 39 deletions(-) diff --git a/itkwidgets/cell_watcher.py b/itkwidgets/cell_watcher.py index 2cc75213..51d31146 100644 --- a/itkwidgets/cell_watcher.py +++ b/itkwidgets/cell_watcher.py @@ -1,10 +1,12 @@ import asyncio import sys from inspect import isawaitable, iscoroutinefunction -from typing import Dict, List +from typing import Callable, Dict, List from IPython import get_ipython +from IPython.core.interactiveshell import ExecutionResult from queue import Queue from imjoy_rpc.utils import FuturePromise +from zmq.eventloop.zmqstream import ZMQStream background_tasks = set() @@ -72,13 +74,23 @@ def viewer_ready(self, view: str) -> bool: class CellWatcher(object): + """A singleton class used in interactive Jupyter notebooks in order to + support asynchronous network communication that would otherwise be blocked + by the IPython kernel. + """ + def __new__(cls): - if not hasattr(cls, '_instance'): + """Create a singleton class.""" + if not hasattr(cls, "_instance"): cls._instance = super(CellWatcher, cls).__new__(cls) cls._instance.setup() return cls._instance - def setup(self): + def setup(self) -> None: + """Perform the initial setup, including intercepting 'execute_request' + handlers so that we can handle them internally before the IPython + kernel does. + """ self.viewers = Viewers() self.shell = get_ipython() self.kernel = self.shell.kernel @@ -102,22 +114,52 @@ def setup(self): # Call self.post_run_cell every time the post_run_cell signal is emitted # post_run_cell runs after interactive execution (e.g. a cell in a notebook) - self.shell.events.register('post_run_cell', self.post_run_cell) + self.shell.events.register("post_run_cell", self.post_run_cell) + + def add_viewer(self, view: str) -> None: + """Add a new Viewer object to track. - def add_viewer(self, view): + :param view: The unique string identifier for the Viewer object + :type view: str + """ # Track all Viewer instances self.viewers.add_viewer(view) - def update_viewer_status(self, view, status): + def update_viewer_status(self, view: str, status: bool) -> None: + """Update a Viewer's 'ready' status. If the last cell run failed + because the viewer was unavailable try to run the cell again. + + :param view: The unique string identifier for the Viewer object + :type view: str + :param status: Boolean value indicating whether or not the viewer is + available for requests or updates. This should be false when the plugin + API is not yet available or new data is not yet rendered. + :type status: bool + """ self.viewers.update_viewer_status(view, status) - if self.waiting_on_viewer: + if status and self.waiting_on_viewer: # Might be ready now, try again self.create_task(self.execute_next_request) - def viewer_ready(self, view): + def viewer_ready(self, view: str) -> bool: + """Request the 'ready' status of a viewer. + + :param view: The unique string identifier for the Viewer object + :type view: str + + :return: Boolean value indicating whether or not the viewer is + available for requests or updates. This will be false when the plugin + API is not yet available or new data is not yet rendered. + :rtype: bool + """ return self.viewers.viewer_ready(view) - def _task_cleanup(self, task): + def _task_cleanup(self, task: asyncio.Task) -> None: + """Callback to discard references to tasks once they've completed. + + :param task: Completed task that no longer needs a strong reference + :type task: asyncio.Task + """ global background_tasks try: # "Handle" exceptions here to prevent further errors. Exceptions @@ -127,7 +169,12 @@ def _task_cleanup(self, task): except: background_tasks.discard(task) - def create_task(self, fn): + def create_task(self, fn: Callable) -> None: + """Create a task from the function passed in. + + :param fn: Coroutine to run concurrently as a Task + :type fn: Callable + """ global background_tasks # The event loop only keeps weak references to tasks. # Gather them into a set to avoid garbage collection mid-task. @@ -135,32 +182,70 @@ def create_task(self, fn): background_tasks.add(task) task.add_done_callback(self._task_cleanup) - def capture_event(self, stream, ident, parent): + def capture_event(self, stream: ZMQStream, ident: list, parent: dict) -> None: + """Capture execute_request messages so that we can queue and process + them concurrently as tasks to prevent blocking. + + :param stream: Class to manage event-based messaging on a zmq socket + :type stream: ZMQStream + :param ident: ZeroMQ routing prefix, which can be zero or more socket + identities + :type ident: list + :param parent: A dictonary of dictionaries representing a complete + message as defined by the Jupyter message specification + :type parent: dict + """ self._events.put((stream, ident, parent)) if self._events.qsize() == 1 and self.ready_to_run_next_cell(): # We've added a new task to an empty queue. # Begin executing tasks again. self.create_task(self.execute_next_request) - async def capture_event_async(self, stream, ident, parent): + async def capture_event_async( + self, stream: ZMQStream, ident: list, parent: dict + ) -> None: + """Capture execute_request messages so that we can queue and process + them concurrently as tasks to prevent blocking. + Asynchronous for ipykernel 6+. + + :param stream: Class to manage event-based messaging on a zmq socket + :type stream: ZMQStream + :param ident: ZeroMQ routing prefix, which can be zero or more socket + identities + :type ident: list + :param parent: A dictonary of dictionaries representing a complete + message as defined by the Jupyter message specification + :type parent: dict + """ # ipykernel 6+ self.capture_event(stream, ident, parent) @property - def all_getters_resolved(self): - # Check if all of the getter/setter futures have resolved + def all_getters_resolved(self) -> bool: + """Determine if all tasks representing asynchronous network calls that + fetch values have resolved. + + :return: Whether or not all tasks for the current cell have resolved + :rtype: bool + """ getters_resolved = [f.done() for f in self.results.values()] return all(getters_resolved) - def ready_to_run_next_cell(self): - # Any itk_viewer objects need to be available and all getters/setters - # need to be resolved + def ready_to_run_next_cell(self) -> bool: + """Determine if we are ready to run the next cell in the queue. + + :return: If created Viewer objects are available and all futures are + resolved. + :rtype: bool + """ self.waiting_on_viewer = len(self.viewers.not_created) return self.all_getters_resolved and not self.waiting_on_viewer - async def execute_next_request(self): - # Modeled after the approach used in jupyter-ui-poll - # https://github.com/Kirill888/jupyter-ui-poll/blob/f65b81f95623c699ed7fd66a92be6d40feb73cde/jupyter_ui_poll/_poll.py#L75-L101 + async def execute_next_request(self) -> None: + """Grab the next request if needed and then run the cell if it it ready + to be run. Modeled after the approach used in jupyter-ui-poll. + :ref: https://github.com/Kirill888/jupyter-ui-poll/blob/f65b81f95623c699ed7fd66a92be6d40feb73cde/jupyter_ui_poll/_poll.py#L75-L101 + """ if self._events.empty(): self.abort_all = False @@ -172,7 +257,8 @@ async def execute_next_request(self): # Continue processing the remaining queued tasks await self._execute_next_request() - async def _execute_next_request(self): + async def _execute_next_request(self) -> None: + """Run the cell with the ipykernel shell_handler for execute_request""" # Here we actually run the queued cell as it would have been run stream, ident, parent = self.current_request @@ -202,32 +288,38 @@ async def _execute_next_request(self): # Continue processing the remaining queued tasks self.create_task(self.execute_next_request) - def update_namespace(self): - # Update the namespace variables with the results from the getters + def update_namespace(self) -> None: + """Update the namespace variables with the results from the getters""" # FIXME: This is a temporary "fix" and does not handle updating output keys = [k for k in self.shell.user_ns.keys()] - try: - for key in keys: - value = self.shell.user_ns[key] - if asyncio.isfuture(value) and (isinstance(value, FuturePromise) or isinstance(value, asyncio.Task)): - # Getters/setters return futures - # They should all be resolved now, so use the result - self.shell.user_ns[key] = value.result() - self.results.clear() - except Exception as e: - self.results.clear() - self.abort_all = True - self.create_task(self._execute_next_request) - raise e - - def _callback(self, *args, **kwargs): + for key in keys: + value = self.shell.user_ns[key] + if asyncio.isfuture(value) and ( + isinstance(value, FuturePromise) or isinstance(value, asyncio.Task) + ): + # Functions that need to return values from asynchronous + # network requests return futures. They should all be resolved + # now, so use the result. + self.shell.user_ns[key] = value.result() + self.results.clear() + + def _callback(self, *args, **kwargs) -> None: + """After each future resolves check to see if they are all resolved. If + so, update the namespace and run the next cell in the queue. + """ # After each getter/setter resolves check if they've all resolved if self.all_getters_resolved: self.update_namespace() self.current_request = None self.create_task(self.execute_next_request) - def post_run_cell(self, response): + def post_run_cell(self, response: ExecutionResult) -> None: + """Runs after interactive execution (e.g. a cell in a notebook). Set + the abort flag if there are errors produced by cell execution. + + :param response: The response message produced by cell execution + :type response: ExecutionResult + """ # Abort remaining cells on error in execution if response.error_in_exec is not None: self.abort_all = True