Skip to content

Commit

Permalink
DOC: Add docstrings to CellWatcher class
Browse files Browse the repository at this point in the history
  • Loading branch information
bnmajor committed Dec 19, 2023
1 parent cf6fe9d commit 3c7b60b
Showing 1 changed file with 131 additions and 39 deletions.
170 changes: 131 additions & 39 deletions itkwidgets/cell_watcher.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import asyncio
import sys
from inspect import isawaitable, iscoroutinefunction
from typing import Dict, List
from typing import Callable, Dict, List
from IPython import get_ipython
from IPython.core.interactiveshell import ExecutionResult
from queue import Queue
from imjoy_rpc.utils import FuturePromise
from zmq.eventloop.zmqstream import ZMQStream

background_tasks = set()

Expand Down Expand Up @@ -72,13 +74,23 @@ def viewer_ready(self, view: str) -> bool:


class CellWatcher(object):
"""A singleton class used in interactive Jupyter notebooks in order to
support asynchronous network communication that would otherwise be blocked
by the IPython kernel.
"""

def __new__(cls):
if not hasattr(cls, '_instance'):
"""Create a singleton class."""
if not hasattr(cls, "_instance"):
cls._instance = super(CellWatcher, cls).__new__(cls)
cls._instance.setup()
return cls._instance

def setup(self):
def setup(self) -> None:
"""Perform the initial setup, including intercepting 'execute_request'
handlers so that we can handle them internally before the IPython
kernel does.
"""
self.viewers = Viewers()
self.shell = get_ipython()
self.kernel = self.shell.kernel
Expand All @@ -102,22 +114,52 @@ def setup(self):

# Call self.post_run_cell every time the post_run_cell signal is emitted
# post_run_cell runs after interactive execution (e.g. a cell in a notebook)
self.shell.events.register('post_run_cell', self.post_run_cell)
self.shell.events.register("post_run_cell", self.post_run_cell)

def add_viewer(self, view: str) -> None:
"""Add a new Viewer object to track.
def add_viewer(self, view):
:param view: The unique string identifier for the Viewer object
:type view: str
"""
# Track all Viewer instances
self.viewers.add_viewer(view)

def update_viewer_status(self, view, status):
def update_viewer_status(self, view: str, status: bool) -> None:
"""Update a Viewer's 'ready' status. If the last cell run failed
because the viewer was unavailable try to run the cell again.
:param view: The unique string identifier for the Viewer object
:type view: str
:param status: Boolean value indicating whether or not the viewer is
available for requests or updates. This should be false when the plugin
API is not yet available or new data is not yet rendered.
:type status: bool
"""
self.viewers.update_viewer_status(view, status)
if self.waiting_on_viewer:
if status and self.waiting_on_viewer:
# Might be ready now, try again
self.create_task(self.execute_next_request)

def viewer_ready(self, view):
def viewer_ready(self, view: str) -> bool:
"""Request the 'ready' status of a viewer.
:param view: The unique string identifier for the Viewer object
:type view: str
:return: Boolean value indicating whether or not the viewer is
available for requests or updates. This will be false when the plugin
API is not yet available or new data is not yet rendered.
:rtype: bool
"""
return self.viewers.viewer_ready(view)

def _task_cleanup(self, task):
def _task_cleanup(self, task: asyncio.Task) -> None:
"""Callback to discard references to tasks once they've completed.
:param task: Completed task that no longer needs a strong reference
:type task: asyncio.Task
"""
global background_tasks
try:
# "Handle" exceptions here to prevent further errors. Exceptions
Expand All @@ -127,40 +169,83 @@ def _task_cleanup(self, task):
except:
background_tasks.discard(task)

def create_task(self, fn):
def create_task(self, fn: Callable) -> None:
"""Create a task from the function passed in.
:param fn: Coroutine to run concurrently as a Task
:type fn: Callable
"""
global background_tasks
# The event loop only keeps weak references to tasks.
# Gather them into a set to avoid garbage collection mid-task.
task = asyncio.create_task(fn())
background_tasks.add(task)
task.add_done_callback(self._task_cleanup)

def capture_event(self, stream, ident, parent):
def capture_event(self, stream: ZMQStream, ident: list, parent: dict) -> None:
"""Capture execute_request messages so that we can queue and process
them concurrently as tasks to prevent blocking.
:param stream: Class to manage event-based messaging on a zmq socket
:type stream: ZMQStream
:param ident: ZeroMQ routing prefix, which can be zero or more socket
identities
:type ident: list
:param parent: A dictonary of dictionaries representing a complete
message as defined by the Jupyter message specification
:type parent: dict
"""
self._events.put((stream, ident, parent))
if self._events.qsize() == 1 and self.ready_to_run_next_cell():
# We've added a new task to an empty queue.
# Begin executing tasks again.
self.create_task(self.execute_next_request)

async def capture_event_async(self, stream, ident, parent):
async def capture_event_async(
self, stream: ZMQStream, ident: list, parent: dict
) -> None:
"""Capture execute_request messages so that we can queue and process
them concurrently as tasks to prevent blocking.
Asynchronous for ipykernel 6+.
:param stream: Class to manage event-based messaging on a zmq socket
:type stream: ZMQStream
:param ident: ZeroMQ routing prefix, which can be zero or more socket
identities
:type ident: list
:param parent: A dictonary of dictionaries representing a complete
message as defined by the Jupyter message specification
:type parent: dict
"""
# ipykernel 6+
self.capture_event(stream, ident, parent)

@property
def all_getters_resolved(self):
# Check if all of the getter/setter futures have resolved
def all_getters_resolved(self) -> bool:
"""Determine if all tasks representing asynchronous network calls that
fetch values have resolved.
:return: Whether or not all tasks for the current cell have resolved
:rtype: bool
"""
getters_resolved = [f.done() for f in self.results.values()]
return all(getters_resolved)

def ready_to_run_next_cell(self):
# Any itk_viewer objects need to be available and all getters/setters
# need to be resolved
def ready_to_run_next_cell(self) -> bool:
"""Determine if we are ready to run the next cell in the queue.
:return: If created Viewer objects are available and all futures are
resolved.
:rtype: bool
"""
self.waiting_on_viewer = len(self.viewers.not_created)
return self.all_getters_resolved and not self.waiting_on_viewer

async def execute_next_request(self):
# Modeled after the approach used in jupyter-ui-poll
# https://github.com/Kirill888/jupyter-ui-poll/blob/f65b81f95623c699ed7fd66a92be6d40feb73cde/jupyter_ui_poll/_poll.py#L75-L101
async def execute_next_request(self) -> None:
"""Grab the next request if needed and then run the cell if it it ready
to be run. Modeled after the approach used in jupyter-ui-poll.
:ref: https://github.com/Kirill888/jupyter-ui-poll/blob/f65b81f95623c699ed7fd66a92be6d40feb73cde/jupyter_ui_poll/_poll.py#L75-L101
"""
if self._events.empty():
self.abort_all = False

Expand All @@ -172,7 +257,8 @@ async def execute_next_request(self):
# Continue processing the remaining queued tasks
await self._execute_next_request()

async def _execute_next_request(self):
async def _execute_next_request(self) -> None:
"""Run the cell with the ipykernel shell_handler for execute_request"""
# Here we actually run the queued cell as it would have been run
stream, ident, parent = self.current_request

Expand Down Expand Up @@ -202,32 +288,38 @@ async def _execute_next_request(self):
# Continue processing the remaining queued tasks
self.create_task(self.execute_next_request)

def update_namespace(self):
# Update the namespace variables with the results from the getters
def update_namespace(self) -> None:
"""Update the namespace variables with the results from the getters"""
# FIXME: This is a temporary "fix" and does not handle updating output
keys = [k for k in self.shell.user_ns.keys()]
try:
for key in keys:
value = self.shell.user_ns[key]
if asyncio.isfuture(value) and (isinstance(value, FuturePromise) or isinstance(value, asyncio.Task)):
# Getters/setters return futures
# They should all be resolved now, so use the result
self.shell.user_ns[key] = value.result()
self.results.clear()
except Exception as e:
self.results.clear()
self.abort_all = True
self.create_task(self._execute_next_request)
raise e

def _callback(self, *args, **kwargs):
for key in keys:
value = self.shell.user_ns[key]
if asyncio.isfuture(value) and (
isinstance(value, FuturePromise) or isinstance(value, asyncio.Task)
):
# Functions that need to return values from asynchronous
# network requests return futures. They should all be resolved
# now, so use the result.
self.shell.user_ns[key] = value.result()
self.results.clear()

def _callback(self, *args, **kwargs) -> None:
"""After each future resolves check to see if they are all resolved. If
so, update the namespace and run the next cell in the queue.
"""
# After each getter/setter resolves check if they've all resolved
if self.all_getters_resolved:
self.update_namespace()
self.current_request = None
self.create_task(self.execute_next_request)

def post_run_cell(self, response):
def post_run_cell(self, response: ExecutionResult) -> None:
"""Runs after interactive execution (e.g. a cell in a notebook). Set
the abort flag if there are errors produced by cell execution.
:param response: The response message produced by cell execution
:type response: ExecutionResult
"""
# Abort remaining cells on error in execution
if response.error_in_exec is not None:
self.abort_all = True

0 comments on commit 3c7b60b

Please sign in to comment.