Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Adding the abaility to queue future cell requests with the goal to a)not run any
cells until the viewer has been created and b)inspect cells before running to
check for getters. When getters are found they should create futures and once
they have resolved we can run that cell in its completion.
  • Loading branch information
bnmajor committed Dec 14, 2022
1 parent 5ead9b5 commit c4df265
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 16 deletions.
90 changes: 90 additions & 0 deletions itkwidgets/cell_watcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import asyncio
from inspect import isawaitable, iscoroutinefunction
from queue import Queue
import re
import sys
from IPython import get_ipython


class CellWatcher(object):
def __init__(self, viewer):
self.raw = ''
self.id = None
self.viewer = viewer
self.view_object = None
self.shell = get_ipython()
self.kernel = self.shell.kernel
self.execute_request_handler = self.kernel.shell_handlers["execute_request"]

self._events = Queue()

if iscoroutinefunction(self.execute_request_handler): # ipykernel 6+
self.kernel.shell_handlers["execute_request"] = self.capture_event_async
else:
# ipykernel < 6
self.kernel.shell_handlers["execute_request"] = self.capture_event

self.shell.events.register('pre_run_cell', self.pre_run_cell)
self.shell.events.register('post_run_cell', self.post_run_cell)

def capture_event(self, stream, ident, parent):
self._events.put((stream, ident, parent))

async def capture_event_async(self, stream, ident, parent):
self.capture_event(stream, ident, parent)

def preprocess_getters(self, raw):
regex = f"{self.view_object}.get_(.*)\([^()]*\)"
# Find and evaluate any getters in the cell
[self.shell.ev(m.group()) for m in re.finditer(regex, raw)]

async def execute_next_request(self):
stream, ident, parent = self._events.get()

self.preprocess_getters(parent["content"]["code"])

sys.stdout.flush()
sys.stderr.flush()
self.kernel.set_parent(ident, parent)
if self.kernel._aborting:
self.kernel._send_abort_reply(stream, parent, ident)
else:
rr = self.kernel.execute_request(stream, ident, parent)
if isawaitable(rr):
rr = await rr

shell_stream = getattr(self.kernel, "shell_stream", None) # ipykernel 6 vs 5 differences

# replicate shell_dispatch behaviour
sys.stdout.flush()
sys.stderr.flush()
if shell_stream is not None: # 6+
self.kernel._publish_status("idle", "shell")
shell_stream.flush(2)
else:
self.kernel._publish_status("idle")

if self._events.qsize():
await self.execute_next_request()

def _callback(self, name, future):
self.viewer.results[name].set_result(future.result())
getters_resolved = [f.done() for f in self.viewer.results.values()]
# if all getters have resolved then ready to re-run
if all(getters_resolved):
self.shell.run_cell(raw_cell=self.raw, cell_id=self.id)

def pre_run_cell(self, info):
# grab info for re-run
self.raw = info.raw_cell
self.id = info.cell_id

def find_view_object(self, raw):
assignments = re.split("\n|=|;", raw)
obj_idx = [i-1 for i, v in enumerate(assignments) if 'view(' in v]
if len(obj_idx):
self.view_object = assignments[obj_idx[0]].strip()

def post_run_cell(self, result):
if self.view_object is None:
self.find_view_object(result.info.raw_cell)
20 changes: 4 additions & 16 deletions itkwidgets/viewer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
import threading
from imjoy_rpc import api
from typing import List
from IPython import get_ipython
from IPython.display import display, HTML
from IPython.lib import backgroundjobs as bg
import uuid

from ._type_aliases import Gaussians, Style, Image, Point_Sets
from ._initialization_params import init_params_dict, init_key_aliases
from ._method_types import deferred_methods
from .cell_watcher import CellWatcher
from .integrations import _detect_render_type, _get_viewer_image, _get_viewer_point_sets
from .integrations.environment import ENVIRONMENT, Env
from .render_types import RenderType
Expand Down Expand Up @@ -140,8 +140,7 @@ def __init__(
self.viewer_rpc = ViewerRPC(
ui_collapsed=ui_collapsed, rotate=rotate, ui=ui, data=data
)
self.ip = get_ipython()
self.ip.events.register('pre_run_cell', self.pre_run_cell)
self.cw = CellWatcher(self)
if ENVIRONMENT is not Env.JUPYTERLITE:
self._setup_queueing()
api.export(self.viewer_rpc)
Expand Down Expand Up @@ -196,6 +195,7 @@ def _run_queued_requests(queue):

# Wait for the viewer to be created
self.viewer_rpc.viewer_event.wait()
await self.cw.execute_next_request()
while self.queue.qsize():
_run_queued_requests(self.queue)
# Wait for the data to be set
Expand All @@ -209,23 +209,11 @@ def queue_worker(self):
task = loop.create_task(self.run_queued_requests())
loop.run_until_complete(task)

def pre_run_cell(self, info):
# grab info for re-run
self.raw = info.raw_cell
self.id = info.cell_id

def _callback(self, name, future):
self.results[name].set_result(future.result())
getters_resolved = [f.done() for f in self.results.values()]
# if all getters have resolved then ready to re-run
if all(getters_resolved):
self.ip.run_cell(raw_cell=self.raw, cell_id=self.id)

def call_getter(self, method, *args, **kwargs):
self.results[method] = self.loop.create_future()
fn = getattr(self.viewer_rpc.itk_viewer, method)
future = asyncio.ensure_future(fn(*args, **kwargs))
future.add_done_callback(functools.partial(self._callback, method))
future.add_done_callback(functools.partial(self.cw._callback, method))

def queue_request(self, method, *args, **kwargs):
if ENVIRONMENT is Env.JUPYTERLITE or self.has_viewer:
Expand Down

0 comments on commit c4df265

Please sign in to comment.