From c4df265964e3296e5bbfec983d40490dd80050b4 Mon Sep 17 00:00:00 2001 From: Brianna Major Date: Tue, 13 Dec 2022 22:23:46 -0500 Subject: [PATCH] WIP Adding the abaility to queue future cell requests with the goal to a)not run any cells until the viewer has been created and b)inspect cells before running to check for getters. When getters are found they should create futures and once they have resolved we can run that cell in its completion. --- itkwidgets/cell_watcher.py | 90 ++++++++++++++++++++++++++++++++++++++ itkwidgets/viewer.py | 20 ++------- 2 files changed, 94 insertions(+), 16 deletions(-) create mode 100644 itkwidgets/cell_watcher.py diff --git a/itkwidgets/cell_watcher.py b/itkwidgets/cell_watcher.py new file mode 100644 index 00000000..4b46ae45 --- /dev/null +++ b/itkwidgets/cell_watcher.py @@ -0,0 +1,90 @@ +import asyncio +from inspect import isawaitable, iscoroutinefunction +from queue import Queue +import re +import sys +from IPython import get_ipython + + +class CellWatcher(object): + def __init__(self, viewer): + self.raw = '' + self.id = None + self.viewer = viewer + self.view_object = None + self.shell = get_ipython() + self.kernel = self.shell.kernel + self.execute_request_handler = self.kernel.shell_handlers["execute_request"] + + self._events = Queue() + + if iscoroutinefunction(self.execute_request_handler): # ipykernel 6+ + self.kernel.shell_handlers["execute_request"] = self.capture_event_async + else: + # ipykernel < 6 + self.kernel.shell_handlers["execute_request"] = self.capture_event + + self.shell.events.register('pre_run_cell', self.pre_run_cell) + self.shell.events.register('post_run_cell', self.post_run_cell) + + def capture_event(self, stream, ident, parent): + self._events.put((stream, ident, parent)) + + async def capture_event_async(self, stream, ident, parent): + self.capture_event(stream, ident, parent) + + def preprocess_getters(self, raw): + regex = f"{self.view_object}.get_(.*)\([^()]*\)" + # Find and evaluate any getters in the cell + [self.shell.ev(m.group()) for m in re.finditer(regex, raw)] + + async def execute_next_request(self): + stream, ident, parent = self._events.get() + + self.preprocess_getters(parent["content"]["code"]) + + sys.stdout.flush() + sys.stderr.flush() + self.kernel.set_parent(ident, parent) + if self.kernel._aborting: + self.kernel._send_abort_reply(stream, parent, ident) + else: + rr = self.kernel.execute_request(stream, ident, parent) + if isawaitable(rr): + rr = await rr + + shell_stream = getattr(self.kernel, "shell_stream", None) # ipykernel 6 vs 5 differences + + # replicate shell_dispatch behaviour + sys.stdout.flush() + sys.stderr.flush() + if shell_stream is not None: # 6+ + self.kernel._publish_status("idle", "shell") + shell_stream.flush(2) + else: + self.kernel._publish_status("idle") + + if self._events.qsize(): + await self.execute_next_request() + + def _callback(self, name, future): + self.viewer.results[name].set_result(future.result()) + getters_resolved = [f.done() for f in self.viewer.results.values()] + # if all getters have resolved then ready to re-run + if all(getters_resolved): + self.shell.run_cell(raw_cell=self.raw, cell_id=self.id) + + def pre_run_cell(self, info): + # grab info for re-run + self.raw = info.raw_cell + self.id = info.cell_id + + def find_view_object(self, raw): + assignments = re.split("\n|=|;", raw) + obj_idx = [i-1 for i, v in enumerate(assignments) if 'view(' in v] + if len(obj_idx): + self.view_object = assignments[obj_idx[0]].strip() + + def post_run_cell(self, result): + if self.view_object is None: + self.find_view_object(result.info.raw_cell) diff --git a/itkwidgets/viewer.py b/itkwidgets/viewer.py index bde1ef60..45771ce3 100644 --- a/itkwidgets/viewer.py +++ b/itkwidgets/viewer.py @@ -4,7 +4,6 @@ import threading from imjoy_rpc import api from typing import List -from IPython import get_ipython from IPython.display import display, HTML from IPython.lib import backgroundjobs as bg import uuid @@ -12,6 +11,7 @@ from ._type_aliases import Gaussians, Style, Image, Point_Sets from ._initialization_params import init_params_dict, init_key_aliases from ._method_types import deferred_methods +from .cell_watcher import CellWatcher from .integrations import _detect_render_type, _get_viewer_image, _get_viewer_point_sets from .integrations.environment import ENVIRONMENT, Env from .render_types import RenderType @@ -140,8 +140,7 @@ def __init__( self.viewer_rpc = ViewerRPC( ui_collapsed=ui_collapsed, rotate=rotate, ui=ui, data=data ) - self.ip = get_ipython() - self.ip.events.register('pre_run_cell', self.pre_run_cell) + self.cw = CellWatcher(self) if ENVIRONMENT is not Env.JUPYTERLITE: self._setup_queueing() api.export(self.viewer_rpc) @@ -196,6 +195,7 @@ def _run_queued_requests(queue): # Wait for the viewer to be created self.viewer_rpc.viewer_event.wait() + await self.cw.execute_next_request() while self.queue.qsize(): _run_queued_requests(self.queue) # Wait for the data to be set @@ -209,23 +209,11 @@ def queue_worker(self): task = loop.create_task(self.run_queued_requests()) loop.run_until_complete(task) - def pre_run_cell(self, info): - # grab info for re-run - self.raw = info.raw_cell - self.id = info.cell_id - - def _callback(self, name, future): - self.results[name].set_result(future.result()) - getters_resolved = [f.done() for f in self.results.values()] - # if all getters have resolved then ready to re-run - if all(getters_resolved): - self.ip.run_cell(raw_cell=self.raw, cell_id=self.id) - def call_getter(self, method, *args, **kwargs): self.results[method] = self.loop.create_future() fn = getattr(self.viewer_rpc.itk_viewer, method) future = asyncio.ensure_future(fn(*args, **kwargs)) - future.add_done_callback(functools.partial(self._callback, method)) + future.add_done_callback(functools.partial(self.cw._callback, method)) def queue_request(self, method, *args, **kwargs): if ENVIRONMENT is Env.JUPYTERLITE or self.has_viewer: