Skip to content

Commit

Permalink
Fix AttributeError/Remove use of NoteableKernelManager (#194)
Browse files Browse the repository at this point in the history
* Fix AttributeError/Remove use of NoteableKernelManager

* changelog

* Bump version: 0.0.28 → 0.0.29

* fix tests

* fix tests
  • Loading branch information
rohitsanj authored Jul 20, 2023
1 parent 3a3fa04 commit b6d582a
Show file tree
Hide file tree
Showing 8 changed files with 87 additions and 208 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.0.28
current_version = 0.0.29
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)
serialize =
{major}.{minor}.{patch}
Expand Down
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
## [0.0.29] - 2023-07-20
### Removed
- Remove use of `NoteableKernelManager` to reduce unnecessary abstractions

### Fixed
- Fix `AttributeError` on trying to fetch `NoteableClient.file_session_cache` which has been removed. Determine kernels channel id from file_id instead.

## [0.0.28] - 2023-07-17
### Changed
- Allow any version of `noteable-origami` to be installed with this library
Expand Down
2 changes: 1 addition & 1 deletion papermill_origami/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = "0.0.28"
version = "0.0.29"
68 changes: 22 additions & 46 deletions papermill_origami/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
)
from papermill.engines import Engine, NotebookExecutionManager

from .manager import NoteableKernelManager
from .util import flatten_dict, parse_noteable_file_id

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -74,7 +73,6 @@ def __init__(
self,
nb_man: NotebookExecutionManager,
client: Optional[NoteableClient] = None,
km: Optional[NoteableKernelManager] = None,
timeout_func=None,
timeout: float = None,
log_output: bool = False,
Expand All @@ -88,19 +86,15 @@ def __init__(
----------
nb_man : NotebookExecutionManager
Notebook execution manager wrapper being executed.
km : KernelManager (optional)
Optional kernel manager. If none is provided, a kernel manager will
be created.
"""
self.nb_man = nb_man
self.client = client
self.km = km
self.timeout_func = timeout_func
self.timeout = timeout
self.log_output = log_output
self.stdout_file = stdout_file
self.stderr_file = stderr_file
self.kernel_name = kw.get('kernel_name', '__NOT_SET__')
self.kernel_name = kw.get('kernel_name')
self.nb = nb_man.nb
# Map parent_collection_id to cell_id in order to process any append_output_events
# which are uniquely identified by parent_collection_id and not cell_id
Expand Down Expand Up @@ -136,7 +130,7 @@ def wrapper(cell, *args, **kwargs):
cell.metadata.papermill, parent_key_tuple=("papermill",)
).items():
try:
run_sync(self.km.client.update_cell_metadata)(
run_sync(self.client.update_cell_metadata)(
file=self.file,
cell_id=cell.id,
metadata_update_properties={"path": key, "value": value},
Expand Down Expand Up @@ -245,7 +239,6 @@ async def execute(self, **kwargs):
ext_logger.info(f"Parameterized notebook available at {parameterized_url}")

# Temporarily sleep for 1s to wait for file to be available to be subscribed to.
# TODO: remove this once Noteable API fix is deployed to prod.
await asyncio.sleep(1)

try:
Expand Down Expand Up @@ -287,9 +280,6 @@ async def execute(self, **kwargs):

# Override the notebook_complete method and set it to a no-op (since we already called it)
self.nb_man.notebook_complete = lambda: None

# info_msg = self.wait_for_reply(self.kc.kernel_info())
# self.nb.metadata['language_info'] = info_msg['content']['language_info']
except: # noqa
logger.exception("Error executing notebook")
if self.job_instance_attempt:
Expand Down Expand Up @@ -319,11 +309,11 @@ async def sync_noteable_nb_with_papermill(
deleted_cell_ids = list(set(noteable_nb_cell_ids) - set(papermill_nb_cell_ids))
added_cell_ids = list(set(papermill_nb_cell_ids) - set(noteable_nb_cell_ids))
for cell_id in deleted_cell_ids:
await self.km.client.delete_cell(file, cell_id)
await self.client.delete_cell(file, cell_id)
for cell_id in added_cell_ids:
idx = papermill_nb_cell_ids.index(cell_id)
after_id = papermill_nb_cell_ids[idx - 1] if idx > 0 else None
await self.km.client.add_cell(file, cell=papermill_nb.cells[idx], after_id=after_id)
await self.client.add_cell(file, cell=papermill_nb.cells[idx], after_id=after_id)

ext_logger.info(
"Synced notebook with Noteable, "
Expand All @@ -339,38 +329,26 @@ async def sync_noteable_nb_metadata_with_papermill(self):
self.nb.metadata.papermill, parent_key_tuple=("papermill",)
).items():
try:
await self.km.client.update_nb_metadata(self.file, {"path": key, "value": value})
await self.client.update_nb_metadata(self.file, {"path": key, "value": value})
except (asyncio.exceptions.TimeoutError, websockets.exceptions.ConnectionClosedError):
logger.debug("Encountered an error while updating notebook metadata")
pass

@staticmethod
def create_kernel_manager(file: NotebookFile, client: NoteableClient, **kwargs):
"""Helper that generates a kernel manager object from kwargs"""
return NoteableKernelManager(file, client, **kwargs)

@asynccontextmanager
async def setup_kernel(self, cleanup_kc=True, cleanup_kc_on_error=False, **kwargs) -> Generator:
"""Context manager for setting up the kernel to execute a notebook."""
ext_logger = kwargs["logger"]

if self.km is None:
# Assumes that file and client are being passed in
self.km = self.create_kernel_manager(**kwargs)
# Pass in the kernel name if specified
launch_kwargs = {}
if self.kernel_name is not None:
launch_kwargs["kernel_name"] = self.kernel_name

await self.client.get_or_launch_ready_kernel_session(self.file, **launch_kwargs)

await self.km.async_start_kernel(**kwargs)
ext_logger.info("Started kernel")

try:
yield
# if cleanup_kc:
# if await self.km.async_is_alive():
# await self.km.async_shutdown_kernel()
finally:
pass
# if cleanup_kc and cleanup_kc_on_error:
# if await self.km.async_is_alive():
# await self.km.async_shutdown_kernel()
yield

sync_execute = run_sync(execute)

Expand All @@ -381,7 +359,7 @@ def _cell_exception(self, cell, cell_index=None, **kwargs):
self.catch_cell_metadata_updates(self.nb_man.cell_exception)(cell, cell_index, **kwargs)
# Manually update the Noteable nb metadata
try:
run_sync(self.km.client.update_nb_metadata)(
run_sync(self.client.update_nb_metadata)(
self.file, {"path": ["papermill", "exception"], "value": True}
)
except (asyncio.exceptions.TimeoutError, websockets.exceptions.ConnectionClosedError):
Expand All @@ -408,34 +386,35 @@ async def papermill_execute_cells(self):
metadata of each cell.
"""

files_channel = self.km.client.files_channel(file_id=self.km.file.id)
self.km.client.register_message_callback(
files_channel = self.client.files_channel(file_id=self.file.id)
kernels_channel = self.client.kernels_channel(file_id=self.file.id)
self.client.register_message_callback(
self._update_outputs_callback,
files_channel,
"update_output_collection_event",
response_schema=UpdateOutputCollectionEventSchema,
once=False,
)

self.km.client.register_message_callback(
self.client.register_message_callback(
self._append_outputs_callback,
files_channel,
"append_output_event",
response_schema=AppendOutputEventSchema,
once=False,
)

self.km.client.register_message_callback(
self.client.register_message_callback(
self._display_handler_update_callback,
files_channel,
"update_outputs_by_display_id_event",
response_schema=DisplayHandlerUpdateEventSchema,
once=False,
)

self.km.client.register_message_callback(
self.client.register_message_callback(
self._update_execution_count_callback,
self.km.kernel.kernel_channel,
kernels_channel,
"bulk_cell_state_update_event",
response_schema=BulkCellStateMessage,
once=False,
Expand Down Expand Up @@ -651,7 +630,7 @@ async def async_execute_cell(
cell : NotebookNode
The cell which was just processed.
"""
assert self.km.client is not None
assert self.client is not None
if cell.cell_type != 'code':
logger.debug("Skipping non-executing cell %s", cell_index)
return cell
Expand All @@ -672,11 +651,8 @@ async def async_execute_cell(

# By default this will wait until the cell execution status is no longer active

result = await self.km.client.execute(self.km.file, cell.id)
# TODO: This wasn't behaving correctly with the timeout?!
# result = await asyncio.wait_for(self.km.client.execute(self.km.file, cell.id), self._get_timeout(cell))
result = await self.client.execute(self.file, cell.id)
if result.state.is_error_state:
# TODO: Add error info from stacktrace output messages
raise CellExecutionError("", str(result.state), "Cell execution failed")
return cell

Expand Down
125 changes: 0 additions & 125 deletions papermill_origami/manager.py

This file was deleted.

Loading

0 comments on commit b6d582a

Please sign in to comment.