From b6d582a44db681beb99a46dd90a12222137aaaac Mon Sep 17 00:00:00 2001 From: Rohit Sanjay Date: Thu, 20 Jul 2023 14:16:32 -0700 Subject: [PATCH] Fix AttributeError/Remove use of NoteableKernelManager (#194) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Fix AttributeError/Remove use of NoteableKernelManager * changelog * Bump version: 0.0.28 → 0.0.29 * fix tests * fix tests --- .bumpversion.cfg | 2 +- CHANGELOG.md | 7 ++ papermill_origami/_version.py | 2 +- papermill_origami/engine.py | 68 +++++--------- papermill_origami/manager.py | 125 ------------------------- papermill_origami/tests/test_engine.py | 45 ++++----- poetry.lock | 44 +++++++-- pyproject.toml | 2 +- 8 files changed, 87 insertions(+), 208 deletions(-) delete mode 100644 papermill_origami/manager.py diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 1e2c613..3adcaf1 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 0.0.28 +current_version = 0.0.29 parse = (?P\d+)\.(?P\d+)\.(?P\d+) serialize = {major}.{minor}.{patch} diff --git a/CHANGELOG.md b/CHANGELOG.md index 5f51406..8a653c3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,13 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +## [0.0.29] - 2023-07-20 +### Removed +- Remove use of `NoteableKernelManager` to reduce unnecessary abstractions + +### Fixed +- Fix `AttributeError` on trying to fetch `NoteableClient.file_session_cache` which has been removed. Determine kernels channel id from file_id instead. + ## [0.0.28] - 2023-07-17 ### Changed - Allow any version of `noteable-origami` to be installed with this library diff --git a/papermill_origami/_version.py b/papermill_origami/_version.py index 7dc6883..be67c33 100644 --- a/papermill_origami/_version.py +++ b/papermill_origami/_version.py @@ -1 +1 @@ -version = "0.0.28" +version = "0.0.29" diff --git a/papermill_origami/engine.py b/papermill_origami/engine.py index 26f34ac..ee32f58 100644 --- a/papermill_origami/engine.py +++ b/papermill_origami/engine.py @@ -37,7 +37,6 @@ ) from papermill.engines import Engine, NotebookExecutionManager -from .manager import NoteableKernelManager from .util import flatten_dict, parse_noteable_file_id logger = logging.getLogger(__name__) @@ -74,7 +73,6 @@ def __init__( self, nb_man: NotebookExecutionManager, client: Optional[NoteableClient] = None, - km: Optional[NoteableKernelManager] = None, timeout_func=None, timeout: float = None, log_output: bool = False, @@ -88,19 +86,15 @@ def __init__( ---------- nb_man : NotebookExecutionManager Notebook execution manager wrapper being executed. - km : KernelManager (optional) - Optional kernel manager. If none is provided, a kernel manager will - be created. """ self.nb_man = nb_man self.client = client - self.km = km self.timeout_func = timeout_func self.timeout = timeout self.log_output = log_output self.stdout_file = stdout_file self.stderr_file = stderr_file - self.kernel_name = kw.get('kernel_name', '__NOT_SET__') + self.kernel_name = kw.get('kernel_name') self.nb = nb_man.nb # Map parent_collection_id to cell_id in order to process any append_output_events # which are uniquely identified by parent_collection_id and not cell_id @@ -136,7 +130,7 @@ def wrapper(cell, *args, **kwargs): cell.metadata.papermill, parent_key_tuple=("papermill",) ).items(): try: - run_sync(self.km.client.update_cell_metadata)( + run_sync(self.client.update_cell_metadata)( file=self.file, cell_id=cell.id, metadata_update_properties={"path": key, "value": value}, @@ -245,7 +239,6 @@ async def execute(self, **kwargs): ext_logger.info(f"Parameterized notebook available at {parameterized_url}") # Temporarily sleep for 1s to wait for file to be available to be subscribed to. - # TODO: remove this once Noteable API fix is deployed to prod. await asyncio.sleep(1) try: @@ -287,9 +280,6 @@ async def execute(self, **kwargs): # Override the notebook_complete method and set it to a no-op (since we already called it) self.nb_man.notebook_complete = lambda: None - - # info_msg = self.wait_for_reply(self.kc.kernel_info()) - # self.nb.metadata['language_info'] = info_msg['content']['language_info'] except: # noqa logger.exception("Error executing notebook") if self.job_instance_attempt: @@ -319,11 +309,11 @@ async def sync_noteable_nb_with_papermill( deleted_cell_ids = list(set(noteable_nb_cell_ids) - set(papermill_nb_cell_ids)) added_cell_ids = list(set(papermill_nb_cell_ids) - set(noteable_nb_cell_ids)) for cell_id in deleted_cell_ids: - await self.km.client.delete_cell(file, cell_id) + await self.client.delete_cell(file, cell_id) for cell_id in added_cell_ids: idx = papermill_nb_cell_ids.index(cell_id) after_id = papermill_nb_cell_ids[idx - 1] if idx > 0 else None - await self.km.client.add_cell(file, cell=papermill_nb.cells[idx], after_id=after_id) + await self.client.add_cell(file, cell=papermill_nb.cells[idx], after_id=after_id) ext_logger.info( "Synced notebook with Noteable, " @@ -339,38 +329,26 @@ async def sync_noteable_nb_metadata_with_papermill(self): self.nb.metadata.papermill, parent_key_tuple=("papermill",) ).items(): try: - await self.km.client.update_nb_metadata(self.file, {"path": key, "value": value}) + await self.client.update_nb_metadata(self.file, {"path": key, "value": value}) except (asyncio.exceptions.TimeoutError, websockets.exceptions.ConnectionClosedError): logger.debug("Encountered an error while updating notebook metadata") pass - @staticmethod - def create_kernel_manager(file: NotebookFile, client: NoteableClient, **kwargs): - """Helper that generates a kernel manager object from kwargs""" - return NoteableKernelManager(file, client, **kwargs) - @asynccontextmanager async def setup_kernel(self, cleanup_kc=True, cleanup_kc_on_error=False, **kwargs) -> Generator: """Context manager for setting up the kernel to execute a notebook.""" ext_logger = kwargs["logger"] - if self.km is None: - # Assumes that file and client are being passed in - self.km = self.create_kernel_manager(**kwargs) + # Pass in the kernel name if specified + launch_kwargs = {} + if self.kernel_name is not None: + launch_kwargs["kernel_name"] = self.kernel_name + + await self.client.get_or_launch_ready_kernel_session(self.file, **launch_kwargs) - await self.km.async_start_kernel(**kwargs) ext_logger.info("Started kernel") - try: - yield - # if cleanup_kc: - # if await self.km.async_is_alive(): - # await self.km.async_shutdown_kernel() - finally: - pass - # if cleanup_kc and cleanup_kc_on_error: - # if await self.km.async_is_alive(): - # await self.km.async_shutdown_kernel() + yield sync_execute = run_sync(execute) @@ -381,7 +359,7 @@ def _cell_exception(self, cell, cell_index=None, **kwargs): self.catch_cell_metadata_updates(self.nb_man.cell_exception)(cell, cell_index, **kwargs) # Manually update the Noteable nb metadata try: - run_sync(self.km.client.update_nb_metadata)( + run_sync(self.client.update_nb_metadata)( self.file, {"path": ["papermill", "exception"], "value": True} ) except (asyncio.exceptions.TimeoutError, websockets.exceptions.ConnectionClosedError): @@ -408,8 +386,9 @@ async def papermill_execute_cells(self): metadata of each cell. """ - files_channel = self.km.client.files_channel(file_id=self.km.file.id) - self.km.client.register_message_callback( + files_channel = self.client.files_channel(file_id=self.file.id) + kernels_channel = self.client.kernels_channel(file_id=self.file.id) + self.client.register_message_callback( self._update_outputs_callback, files_channel, "update_output_collection_event", @@ -417,7 +396,7 @@ async def papermill_execute_cells(self): once=False, ) - self.km.client.register_message_callback( + self.client.register_message_callback( self._append_outputs_callback, files_channel, "append_output_event", @@ -425,7 +404,7 @@ async def papermill_execute_cells(self): once=False, ) - self.km.client.register_message_callback( + self.client.register_message_callback( self._display_handler_update_callback, files_channel, "update_outputs_by_display_id_event", @@ -433,9 +412,9 @@ async def papermill_execute_cells(self): once=False, ) - self.km.client.register_message_callback( + self.client.register_message_callback( self._update_execution_count_callback, - self.km.kernel.kernel_channel, + kernels_channel, "bulk_cell_state_update_event", response_schema=BulkCellStateMessage, once=False, @@ -651,7 +630,7 @@ async def async_execute_cell( cell : NotebookNode The cell which was just processed. """ - assert self.km.client is not None + assert self.client is not None if cell.cell_type != 'code': logger.debug("Skipping non-executing cell %s", cell_index) return cell @@ -672,11 +651,8 @@ async def async_execute_cell( # By default this will wait until the cell execution status is no longer active - result = await self.km.client.execute(self.km.file, cell.id) - # TODO: This wasn't behaving correctly with the timeout?! - # result = await asyncio.wait_for(self.km.client.execute(self.km.file, cell.id), self._get_timeout(cell)) + result = await self.client.execute(self.file, cell.id) if result.state.is_error_state: - # TODO: Add error info from stacktrace output messages raise CellExecutionError("", str(result.state), "Cell execution failed") return cell diff --git a/papermill_origami/manager.py b/papermill_origami/manager.py deleted file mode 100644 index f8f818a..0000000 --- a/papermill_origami/manager.py +++ /dev/null @@ -1,125 +0,0 @@ -"""This file implements a jupyter_client KernelManager using the origami library""" - -import functools -from typing import Optional - -from jupyter_client.managerabc import KernelManagerABC -from jupyter_client.utils import run_sync -from origami.client import NoteableClient -from origami.defs.files import NotebookFile - - -class NoteableKernelManager(KernelManagerABC): - """KernelManager for Noteable client interactions""" - - def _requires_client_context(func): - """A helper for checking if one is in a websocket context or not""" - - @functools.wraps(func) - async def wrapper(self, *args, **kwargs): - if not self.client.in_context: - raise ValueError("Cannot send RTU request outside of a context manager scope.") - return await func(self, *args, **kwargs) - - return wrapper - - def __init__( - self, file: NotebookFile, client: NoteableClient, kernel_name: Optional[str] = None, **kw - ): - """Sets up basic trackers for the Manager""" - self.open_context = False - self.client: NoteableClient = client - self.file = file - - async def __aenter__(self): - """Helper for context wrapping the client if so desired""" - await self.client.__aenter__() - return self - - async def __aexit__(self, *excinfo): - """Helper for context wrapping the client if so desired""" - await self.client.__aexit__(*excinfo) - - @property - def kernel(self): - """Returns the session details object holding kernel session info""" - return self.client.file_session_cache.get(self.file.id) - - # -------------------------------------------------------------------------- - # Expected functions not part of ABC - # -------------------------------------------------------------------------- - - def pre_start_kernel(self, **kw): - """Can be overwritten to modify kw args. The first return value is always None as - Noteable does not allow for the kernel command to be overwritten - """ - return None, kw - - def post_start_kernel(self, **kw) -> None: - """Can be overwritten to take actions after a kernel cleanup occurs""" - pass - - # -------------------------------------------------------------------------- - # Kernel management - # -------------------------------------------------------------------------- - - async def async_launch_kernel(self, **kw): - """Actually launch the kernel - - Override in a subclass to launch kernel subprocesses differently - """ - import inspect - - func_args = inspect.signature(self.client.get_or_launch_ready_kernel_session).parameters - filtered_kw = {k: v for k, v in kw.items() if k in func_args} - filtered_kw.pop('file') # We're passing it in already - return await self.client.get_or_launch_ready_kernel_session(self.file, **filtered_kw) - - launch_kernel = run_sync(async_launch_kernel) - - async def async_start_kernel(self, **kw): - """Launches a new kernel if not already launched""" - _, kw = self.pre_start_kernel(**kw) - await self.async_launch_kernel(**kw) - self.post_start_kernel(**kw) - return self.kernel - - start_kernel = run_sync(async_start_kernel) - - async def async_shutdown_kernel(self, now=False, restart=False): - """Shutdown the active or pending kernel pod""" - await self.client.delete_kernel_session(self.file) - - shutdown_kernel = run_sync(async_shutdown_kernel) - - async def async_restart_kernel(self, now=False, **kw): - """Restarts a kernel process by forcibly generating a new k8 pod""" - raise NotImplementedError("TODO") - - restart_kernel = run_sync(async_restart_kernel) - - async def async_has_kernel(self): - """Causes a request to be made to check on kernel""" - # TODO: Change to RTU update instead of polling - session = await self.client.get_kernel_session(self.file) - return session is not None and not session.kernel.execution_state.is_gone - - has_kernel = run_sync(async_restart_kernel) - - async def async_interrupt_kernel(self): - """Interrupts active execution on a live kernel""" - raise NotImplementedError("TODO") - - interrupt_kernel = run_sync(async_interrupt_kernel) - - def signal_kernel(self, signum): - """Not Implemented: Kernel managers can normally forward signals to process based kernels""" - raise NotImplementedError("Direct process signaling is not allowed for Noteable kernels") - - async def async_is_alive(self): - """Causes a request to be made to check on kernel""" - # TODO: Change to RTU update instead of polling - session = await self.client.get_kernel_session(self.file) - return session is not None and session.kernel.execution_state.kernel_is_alive - - is_alive = run_sync(async_is_alive) diff --git a/papermill_origami/tests/test_engine.py b/papermill_origami/tests/test_engine.py index e27e082..970cbd8 100644 --- a/papermill_origami/tests/test_engine.py +++ b/papermill_origami/tests/test_engine.py @@ -22,23 +22,15 @@ @pytest.fixture def mock_noteable_client(mocker, file): - patch_async_noteable_client = mocker.patch( - 'papermill_origami.engine.NoteableClient', return_value=mocker.AsyncMock() - ) - create_resp = mocker.Mock() - create_resp.parameterized_notebook = file - # This is done to mock the interactions with async context manager usage of NoteableClient - patch_async_noteable_client.return_value.__aenter__.return_value.create_parameterized_notebook.return_value = ( - create_resp - ) + create_parameterized_notebook_resp = mocker.Mock() + create_parameterized_notebook_resp.parameterized_notebook = file - # Return a mock client to be used by the NoteableEngine - client = mocker.Mock() - client.subscribe_file = mocker.AsyncMock() - client.update_job_instance = mocker.AsyncMock() - client.delete_kernel_session = mocker.AsyncMock() - client.create_parameterized_notebook = mocker.AsyncMock(return_value=create_resp) - return client + return mocker.patch( + 'papermill_origami.engine.NoteableClient', + return_value=mocker.AsyncMock( + **{"create_parameterized_notebook.return_value": create_parameterized_notebook_resp} + ), + ) @pytest.fixture @@ -52,12 +44,10 @@ def noteable_engine(mocker, file, file_content, mock_noteable_client): execute_result = mocker.Mock() execute_result.state.is_error_state = False - noteable_engine = NoteableEngine( - nb_man=mock_nb_man, km=mocker.AsyncMock(), client=mock_noteable_client - ) + noteable_engine = NoteableEngine(nb_man=mock_nb_man, client=mock_noteable_client()) # Set the execution result to successful - noteable_engine.km.client.execute.return_value = execute_result + noteable_engine.client.execute.return_value = execute_result return noteable_engine @@ -79,8 +69,8 @@ async def test_sync_noteable_nb_with_papermill(file, file_content, mocker, notea ext_logger=logging.getLogger(__name__), ) - noteable_engine.km.client.delete_cell.assert_called_with(ANY, deleted_cell['id']) - noteable_engine.km.client.add_cell.assert_called_with(ANY, cell=added_cell, after_id=after_id) + noteable_engine.client.delete_cell.assert_called_with(ANY, deleted_cell['id']) + noteable_engine.client.add_cell.assert_called_with(ANY, cell=added_cell, after_id=after_id) async def test_default_client(mocker, file, file_content, noteable_engine): @@ -90,8 +80,9 @@ async def test_default_client(mocker, file, file_content, noteable_engine): noteable_nb=file_content, logger=logging.getLogger(__name__), ) + noteable_engine.client.get_or_launch_ready_kernel_session.assert_called_once() # Check that we sent an execute request to the client - noteable_engine.km.client.execute.assert_has_calls( + noteable_engine.client.execute.assert_has_calls( [mocker.call(ANY, cell.id) for cell in file_content.cells], any_order=True ) @@ -109,10 +100,8 @@ async def test_ignore_empty_code_cells(mocker, file, file_content, noteable_engi ) # Check that we did not try to execute the empty cells - assert noteable_engine.km.client.execute.call_count == len(file_content.cells) - len( - empty_cells - ) - noteable_engine.km.client.execute.assert_has_calls( + assert noteable_engine.client.execute.call_count == len(file_content.cells) - len(empty_cells) + noteable_engine.client.execute.assert_has_calls( [mocker.call(ANY, cell.id) for cell in non_empty_cells], any_order=True, ) @@ -128,7 +117,7 @@ async def test_propagate_cell_execution_error(mocker, file, file_content, noteab # Set the last cell to be an error execute_results[-1].state.is_error_state = True - noteable_engine.km.client.execute.side_effect = execute_results + noteable_engine.client.execute.side_effect = execute_results await noteable_engine.execute( file_id='fake_id', diff --git a/poetry.lock b/poetry.lock index 09cb869..e2ba4a3 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1692,6 +1692,18 @@ wrapt = ">=1.10,<2" [package.extras] dev = ["PyTest", "PyTest (<5)", "PyTest-Cov", "PyTest-Cov (<2.6)", "bump2version (<1)", "configparser (<5)", "importlib-metadata (<3)", "importlib-resources (<4)", "sphinx (<2)", "sphinxcontrib-websupport (<2)", "tox", "zipp (<2)"] +[[package]] +name = "diff-match-patch" +version = "20200713" +description = "Repackaging of Google's Diff Match and Patch libraries. Offers robust algorithms to perform the operations required for synchronizing plain text." +category = "main" +optional = false +python-versions = ">=2.7" +files = [ + {file = "diff-match-patch-20200713.tar.gz", hash = "sha256:da6f5a01aa586df23dfc89f3827e1cafbb5420be9d87769eeb079ddfd9477a18"}, + {file = "diff_match_patch-20200713-py3-none-any.whl", hash = "sha256:8bf9d9c4e059d917b5c6312bac0c137971a32815ddbda9c682b949f2986b4d34"}, +] + [[package]] name = "dill" version = "0.3.6" @@ -3941,27 +3953,29 @@ files = [ [[package]] name = "noteable-origami" -version = "0.0.24" +version = "0.0.35" description = "The Noteable API interface" category = "main" optional = false python-versions = ">=3.8,<4.0" files = [ - {file = "noteable_origami-0.0.24-py3-none-any.whl", hash = "sha256:4613ddd96195a271dbb1bcb6c8368e3f61c8de4e7ea5b6dbdd62b4aa52747f6d"}, - {file = "noteable_origami-0.0.24.tar.gz", hash = "sha256:1be7d7fd434a5212bd7ffc8b36d8ac20aa9f246839f480be275d8c7f09c7075d"}, + {file = "noteable_origami-0.0.35-py3-none-any.whl", hash = "sha256:62a59599acdf31572a60071a4f41851ca73b04846a506986cb70012c5fea808c"}, + {file = "noteable_origami-0.0.35.tar.gz", hash = "sha256:61799a91c9cb5a5a592c0a9451493f7e8a61dbd4542e9236543a64112a883236"}, ] [package.dependencies] backoff = ">=2.1.2,<3.0.0" bitmath = ">=1.3.3,<2.0.0" -cryptography = ">=39.0.1,<40.0.0" -httpx = ">=0.23.3,<0.24.0" +cryptography = "*" +diff-match-patch = ">=20200713,<20200714" +httpx = "*" jwt = ">=1.3.1,<2.0.0" nbformat = ">=5.4.0,<6.0.0" orjson = ">=3.8.7,<4.0.0" pydantic = ">=1.10.5,<2.0.0" +sending = ">=0.3.0,<0.4.0" structlog = ">=22.1.0,<23.0.0" -websockets = ">=10.3,<11.0" +websockets = "*" [[package]] name = "notebook" @@ -5683,6 +5697,24 @@ nativelib = ["pyobjc-framework-Cocoa", "pywin32"] objc = ["pyobjc-framework-Cocoa"] win32 = ["pywin32"] +[[package]] +name = "sending" +version = "0.3.0" +description = "Library for pub/sub usage within an async application" +category = "main" +optional = false +python-versions = ">=3.8,<4.0" +files = [ + {file = "sending-0.3.0-py3-none-any.whl", hash = "sha256:baad4a8bc08aa55d50bb8d177effb20fa5de1012fc71c2678c90d2af215a403c"}, + {file = "sending-0.3.0.tar.gz", hash = "sha256:4d275253f99bbaf8f791335275a6f64c02ab00870aebccb5a7a257ee86b76801"}, +] + +[package.extras] +all = ["aioredis[hiredis] (>=2.0.0,<3.0.0)", "ipykernel (>=6.15.1,<7.0.0)", "jupyter_client (>=7.3.0,<8.0.0)", "websockets (>=10.3,<11.0)"] +jupyter = ["ipykernel (>=6.15.1,<7.0.0)", "jupyter_client (>=7.3.0,<8.0.0)"] +redis = ["aioredis[hiredis] (>=2.0.0,<3.0.0)"] +websockets = ["websockets (>=10.3,<11.0)"] + [[package]] name = "setproctitle" version = "1.3.2" diff --git a/pyproject.toml b/pyproject.toml index b51aa70..c7db2a3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ [tool.poetry] name = "papermill-origami" -version = "0.0.28" +version = "0.0.29" description = "The noteable API interface" authors = ["Matt Seal "] maintainers = ["Matt Seal "]