Skip to content

Commit

Permalink
Update job instance attempt status during papermill execution (#69)
Browse files Browse the repository at this point in the history
* Update job instance attempt status

* only update if we have a job instance attempt in the first place

* remove strict type check

* add debug logs

* refactor comment

* refactor

* refactor

* upgrade noteable-origami to 0.0.15

* fix tests

* blacken

* update changelog

* Bump version: 0.0.10 → 0.0.11
  • Loading branch information
rohitsanj authored Nov 19, 2022
1 parent bbd5510 commit 5ad4dcc
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 21 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.0.10
current_version = 0.0.11
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)
serialize =
{major}.{minor}.{patch}
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
## [0.0.11] - 2022-11-18
### Added
- Added API calls to update job instance attempt status during execution

## [0.0.10]
### Changed
- Upgrade `noteable-origami` to `0.0.14`
Expand Down
2 changes: 1 addition & 1 deletion papermill_origami/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = "0.0.10"
version = "0.0.11"
48 changes: 43 additions & 5 deletions papermill_origami/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
CustomerJobDefinitionReferenceInput,
CustomerJobInstanceReferenceInput,
JobInstanceAttempt,
JobInstanceAttemptRequest,
JobInstanceAttemptStatus,
JobInstanceAttemptUpdate,
)
from origami.defs.rtu import (
AppendOutputEventSchema,
Expand Down Expand Up @@ -106,6 +108,7 @@ def __init__(
# Currently, this is only used when the output is of type display_data.
self.__noteable_output_id_cache = {}
self.file = None
self.job_instance_attempt: Optional[JobInstanceAttempt] = None

def catch_cell_metadata_updates(self, func):
"""A decorator for catching cell metadata updates related to papermill
Expand Down Expand Up @@ -152,7 +155,13 @@ async def execute(self, **kwargs):
else:
raise ValueError("No file_id or derivable file_id found")

job_instance_attempt = kwargs.get("job_instance_attempt")
job_instance_attempt_request = (
JobInstanceAttemptRequest.parse_obj(kwargs.get('job_instance_attempt'))
if kwargs.get('job_instance_attempt')
else None
)

# Setup job instance attempt from the provided customer job metadata
if job_metadata := kwargs.get("job_metadata", {}):
version = await self.client.get_version_or_none(original_notebook_id)
if version is not None:
Expand All @@ -161,7 +170,7 @@ async def execute(self, **kwargs):
file = await self.client.get_notebook(original_notebook_id)
space_id = file.space_id

# 1: Ensure the job definition&instance references exists
# 1: Ensure the job definition and instance references exists
job_instance = await self.client.create_job_instance(
CustomerJobInstanceReferenceInput(
orchestrator_job_instance_id=job_metadata.get('job_instance_id'),
Expand All @@ -179,16 +188,18 @@ async def execute(self, **kwargs):

# 2: Set up the job instance attempt
# TODO: update the job instance attempt status while running/after completion
job_instance_attempt = JobInstanceAttempt(
job_instance_attempt_request = JobInstanceAttemptRequest(
status=JobInstanceAttemptStatus.CREATED,
attempt_number=0,
customer_job_instance_reference_id=job_instance.id,
)

# Create the parameterized_notebook
self.file = await self.client.create_parameterized_notebook(
original_notebook_id, job_instance_attempt=job_instance_attempt
resp = await self.client.create_parameterized_notebook(
original_notebook_id, job_instance_attempt=job_instance_attempt_request
)
self.file = resp.parameterized_notebook
self.job_instance_attempt = resp.job_instance_attempt
parameterized_url = f"https://{self.client.config.domain}/f/{self.file.id}"

self.nb.metadata["executed_notebook_url"] = parameterized_url
Expand Down Expand Up @@ -234,6 +245,18 @@ async def execute(self, **kwargs):
# Sync metadata from papermill to noteable before execution
await self.sync_noteable_nb_metadata_with_papermill()

# We're going to start executing the notebook; Update the job instance attempt status to RUNNING
if self.job_instance_attempt:
logger.debug(
f"Updating job instance attempt id {self.job_instance_attempt.id} to status RUNNING"
)
await self.client.update_job_instance(
job_instance_attempt_id=self.job_instance_attempt.id,
job_instance_attempt_update=JobInstanceAttemptUpdate(
status=JobInstanceAttemptStatus.RUNNING
),
)

await self.papermill_execute_cells()

# This is a hack to ensure we have the client in session to send nb metadata
Expand Down Expand Up @@ -384,17 +407,32 @@ async def papermill_execute_cells(self):
)

# Execute each cell and update the output in real time.
errored = False
for index, cell in enumerate(self.nb.cells):
try:
self._cell_start(cell, index)
await self.async_execute_cell(cell, index)
except CellExecutionError as ex:
# TODO: Make sure we raise these
self._cell_exception(self.nb.cells[index], index, exception=ex)
errored = True
break
finally:
self._cell_complete(self.nb.cells[index], cell_index=index)

# Update the job instance attempt status
if self.job_instance_attempt:
status = (
JobInstanceAttemptStatus.FAILED if errored else JobInstanceAttemptStatus.SUCCEEDED
)
logger.debug(
f"Updating job instance attempt id {self.job_instance_attempt.id} to status {status}"
)
await self.client.update_job_instance(
job_instance_attempt_id=self.job_instance_attempt.id,
job_instance_attempt_update=JobInstanceAttemptUpdate(status=status),
)

def _get_timeout(self, cell: Optional[NotebookNode]) -> int:
"""Helper to fetch a timeout as a value or a function to be run against a cell"""
if self.timeout_func is not None and cell is not None:
Expand Down
4 changes: 3 additions & 1 deletion papermill_origami/tests/test_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ def mock_noteable_client(mocker, file):
mock_noteable_client = mocker.patch(
'papermill_origami.engine.NoteableClient', return_value=mocker.AsyncMock()
)
create_resp = mocker.Mock()
create_resp.parameterized_notebook = file
mock_noteable_client.return_value.__aenter__.return_value.create_parameterized_notebook.return_value = (
file
create_resp
)


Expand Down
Loading

0 comments on commit 5ad4dcc

Please sign in to comment.