Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experiment.start Returns Records #744

Open
wants to merge 3 commits into
base: smartsim-refactor
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions smartsim/_core/utils/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,13 @@

def unpack(value: _NestedJobSequenceType) -> t.Generator[Job, None, None]:
"""Unpack any iterable input in order to obtain a
single sequence of values
single sequence of values.

:param value: Sequence containing elements of type Job or other
sequences that are also of type _NestedJobSequenceType
:return: flattened list of Jobs"""
sequences that are also of type `_NestedJobSequenceType`.
:raises TypeError: If the value is not a nested sequence of jobs.
:return: A flattened list of `Jobs`.
"""
from smartsim.launchable.job import Job

for item in value:
Expand Down
99 changes: 46 additions & 53 deletions smartsim/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
from smartsim._core.control.launch_history import LaunchHistory as _LaunchHistory
from smartsim._core.utils import helpers as _helpers
from smartsim.error import errors
from smartsim.launchable.job import Job
from smartsim.launchable.job import Job, Record
from smartsim.status import TERMINAL_STATUSES, InvalidJobStatus, JobStatus

from ._core import Generator, Manifest
Expand All @@ -52,7 +52,6 @@
from .log import ctx_exp_path, get_logger, method_contextualizer

if t.TYPE_CHECKING:
from smartsim.launchable.job import Job
from smartsim.types import LaunchedJobID

logger = get_logger(__name__)
Expand Down Expand Up @@ -158,26 +157,24 @@ def __init__(self, name: str, exp_path: str | None = None):
experiment
"""

def start(self, *jobs: Job | t.Sequence[Job]) -> tuple[LaunchedJobID, ...]:
def start(self, *jobs: Job | t.Sequence[Job]) -> tuple[Record, ...]:
MattToast marked this conversation as resolved.
Show resolved Hide resolved
"""Execute a collection of `Job` instances.
MattToast marked this conversation as resolved.
Show resolved Hide resolved

:param jobs: A collection of other job instances to start
:raises TypeError: If jobs provided are not the correct type
:raises ValueError: No Jobs were provided.
:returns: A sequence of ids with order corresponding to the sequence of
jobs that can be used to query or alter the status of that
particular execution of the job.
:returns: A sequence of records with order corresponding to the
sequence of jobs that can be used to query or alter the status of
that particular execution of the job.
"""

if not jobs:
raise ValueError("No jobs provided to start")

# Create the run id
jobs_ = list(_helpers.unpack(jobs))

run_id = datetime.datetime.now().replace(microsecond=0).isoformat()
root = pathlib.Path(self.exp_path, run_id)
return self._dispatch(Generator(root), dispatch.DEFAULT_DISPATCHER, *jobs_)
ids = self._dispatch(Generator(root), dispatch.DEFAULT_DISPATCHER, *jobs_)
return tuple(Record(id_, job) for id_, job in zip(ids, jobs_))

def _dispatch(
self,
Expand Down Expand Up @@ -233,15 +230,13 @@ def execute_dispatch(generator: Generator, job: Job, idx: int) -> LaunchedJobID:
execute_dispatch(generator, job, idx) for idx, job in enumerate(jobs, 1)
)

def get_status(
self, *ids: LaunchedJobID
) -> tuple[JobStatus | InvalidJobStatus, ...]:
"""Get the status of jobs launched through the `Experiment` from their
launched job id returned when calling `Experiment.start`.
def get_status(self, *records: Record) -> tuple[JobStatus | InvalidJobStatus, ...]:
"""Get the status of jobs launched through the `Experiment` from the
record returned when calling `Experiment.start`.

The `Experiment` will map the launched ID back to the launcher that
started the job and request a status update. The order of the returned
statuses exactly matches the order of the launched job ids.
The `Experiment` will map the launched id of the record back to the
launcher that started the job and request a status update. The order of
the returned statuses exactly matches the order of the records.

If the `Experiment` cannot find any launcher that started the job
associated with the launched job id, then a
Expand All @@ -252,16 +247,17 @@ def get_status(
launched job ids issued by user defined launcher are not sufficiently
unique.

:param ids: A sequence of launched job ids issued by the experiment.
:param records: A sequence of records issued by the experiment.
:raises TypeError: If ids provided are not the correct type
:raises ValueError: No IDs were provided.
:returns: A tuple of statuses with order respective of the order of the
calling arguments.
"""
if not ids:
raise ValueError("No job ids provided to get status")
if not all(isinstance(id, str) for id in ids):
raise TypeError("ids argument was not of type LaunchedJobID")
if not records:
raise ValueError("No records provided to get status")
if not all(isinstance(record, Record) for record in records):
raise TypeError("record argument was not of type Record")
ids = tuple(record.launched_id for record in records)

to_query = self._launch_history.group_by_launcher(
set(ids), unknown_ok=True
Expand All @@ -272,39 +268,38 @@ def get_status(
return tuple(stats)

def wait(
self, *ids: LaunchedJobID, timeout: float | None = None, verbose: bool = True
self, *records: Record, timeout: float | None = None, verbose: bool = True
) -> None:
"""Block execution until all of the provided launched jobs, represented
by an ID, have entered a terminal status.

:param ids: The ids of the launched jobs to wait for.
:param records: The records of the launched jobs to wait for.
:param timeout: The max time to wait for all of the launched jobs to end.
:param verbose: Whether found statuses should be displayed in the console.
:raises TypeError: If IDs provided are not the correct type
:raises ValueError: No IDs were provided.
"""
if ids:
if not all(isinstance(id, str) for id in ids):
raise TypeError("ids argument was not of type LaunchedJobID")
else:
raise ValueError("No job ids to wait on provided")
if not records:
raise ValueError("No records to wait on provided")
if not all(isinstance(record, Record) for record in records):
raise TypeError("record argument was not of type Record")
self._poll_for_statuses(
ids, TERMINAL_STATUSES, timeout=timeout, verbose=verbose
records, TERMINAL_STATUSES, timeout=timeout, verbose=verbose
)

def _poll_for_statuses(
self,
ids: t.Sequence[LaunchedJobID],
records: t.Sequence[Record],
statuses: t.Collection[JobStatus],
timeout: float | None = None,
interval: float = 5.0,
verbose: bool = True,
) -> dict[LaunchedJobID, JobStatus | InvalidJobStatus]:
) -> dict[Record, JobStatus | InvalidJobStatus]:
"""Poll the experiment's launchers for the statuses of the launched
jobs with the provided ids, until the status of the changes to one of
the provided statuses.

:param ids: The ids of the launched jobs to wait for.
:param records: The records of the launched jobs to wait for.
:param statuses: A collection of statuses to poll for.
:param timeout: The minimum amount of time to spend polling all jobs to
reach one of the supplied statuses. If not supplied or `None`, the
Expand All @@ -320,12 +315,10 @@ def _poll_for_statuses(
log = logger.info if verbose else lambda *_, **__: None
method_timeout = _interval.SynchronousTimeInterval(timeout)
iter_timeout = _interval.SynchronousTimeInterval(interval)
final: dict[LaunchedJobID, JobStatus | InvalidJobStatus] = {}
final: dict[Record, JobStatus | InvalidJobStatus] = {}

def is_finished(
id_: LaunchedJobID, status: JobStatus | InvalidJobStatus
) -> bool:
job_title = f"Job({id_}): "
def is_finished(record: Record, status: JobStatus | InvalidJobStatus) -> bool:
job_title = f"Job({record.launched_id}, {record.job.name}): "
if done := status in terminal:
log(f"{job_title}Finished with status '{status.value}'")
else:
Expand All @@ -334,22 +327,22 @@ def is_finished(

if iter_timeout.infinite:
raise ValueError("Polling interval cannot be infinite")
while ids and not method_timeout.expired:
while records and not method_timeout.expired:
iter_timeout = iter_timeout.new_interval()
stats = zip(ids, self.get_status(*ids))
stats = zip(records, self.get_status(*records))
is_done = _helpers.group_by(_helpers.pack_params(is_finished), stats)
final |= dict(is_done.get(True, ()))
ids = tuple(id_ for id_, _ in is_done.get(False, ()))
if ids:
records = tuple(rec for rec, _ in is_done.get(False, ()))
if records:
(
iter_timeout
if iter_timeout.remaining < method_timeout.remaining
else method_timeout
).block()
if ids:
if records:
raise TimeoutError(
f"Job ID(s) {', '.join(map(str, ids))} failed to reach "
"terminal status before timeout"
f"Job ID(s) {', '.join(rec.launched_id for rec in records)} "
"failed to reach terminal status before timeout"
)
return final

Expand Down Expand Up @@ -445,20 +438,20 @@ def summary(self, style: str = "github") -> str:
disable_numparse=True,
)

def stop(self, *ids: LaunchedJobID) -> tuple[JobStatus | InvalidJobStatus, ...]:
def stop(self, *records: Record) -> tuple[JobStatus | InvalidJobStatus, ...]:
"""Cancel the execution of a previously launched job.

:param ids: The ids of the launched jobs to stop.
:param records: The records of the launched jobs to stop.
:raises TypeError: If ids provided are not the correct type
:raises ValueError: No job ids were provided.
:returns: A tuple of job statuses upon cancellation with order
respective of the order of the calling arguments.
"""
if ids:
if not all(isinstance(id, str) for id in ids):
raise TypeError("ids argument was not of type LaunchedJobID")
else:
raise ValueError("No job ids provided")
if not records:
raise ValueError("No records provided")
if not all(isinstance(record, Record) for record in records):
raise TypeError("record argument was not of type Record")
ids = tuple(record.launched_id for record in records)
by_launcher = self._launch_history.group_by_launcher(set(ids), unknown_ok=True)
id_to_stop_stat = (
launcher.stop_jobs(*launched).items()
Expand Down
43 changes: 43 additions & 0 deletions smartsim/launchable/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

from __future__ import annotations

import textwrap
import typing as t
from copy import deepcopy

Expand All @@ -39,6 +40,7 @@

if t.TYPE_CHECKING:
from smartsim.entity.entity import SmartSimEntity
from smartsim.types import LaunchedJobID

Check warning on line 43 in smartsim/launchable/job.py

View check run for this annotation

Codecov / codecov/patch

smartsim/launchable/job.py#L43

Added line #L43 was not covered by tests


@t.final
Expand Down Expand Up @@ -158,3 +160,44 @@
string = f"SmartSim Entity: {self.entity}\n"
string += f"Launch Settings: {self.launch_settings}"
return string


@t.final
class Record:
"""A Record object to track a launched job along with its assigned
launch ID.
"""

def __init__(self, launch_id: LaunchedJobID, job: Job) -> None:
"""Initialize a new record of a launched job

:param launch_id: A unique identifier for the launch of the job.
:param job: The job that was launched.
"""
self._id = launch_id
Copy link
Contributor

@amandarichardsonn amandarichardsonn Oct 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you deep copying the Job but not the launch_id here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory, the launch_id (at least for now), is just a string that we ask the type checker to treat like a different type, and strings are immutable in python, so a "deepcopy of" and a "reference to" a string are functionally equivalent.

There is actually some special logic in copy.deepcopy so that, by default, it will just return the original reference to a string if its encounter without change.

TL;DR I was lazy and didn't want to spell out "deepcopy". More than willing to add it though if we think that LaunchedJobID might become a more substantive type in future!

self._job = deepcopy(job)

@property
def launched_id(self) -> LaunchedJobID:
"""The unique identifier for the launched job.

:returns: A unique identifier for the launched job.
"""
return self._id

@property
def job(self) -> Job:
"""A deep copy of the job that was launched.
MattToast marked this conversation as resolved.
Show resolved Hide resolved

:returns: A deep copy of the launched job.
"""
return deepcopy(self._job)

def __str__(self) -> str:
return textwrap.dedent(f"""\

Check warning on line 197 in smartsim/launchable/job.py

View check run for this annotation

Codecov / codecov/patch

smartsim/launchable/job.py#L197

Added line #L197 was not covered by tests
Launch Record:
Launched Job ID:
{self.launched_id}
Laucnehd Job:
{textwrap.indent(str(self._job), " ")}
""")
Loading
Loading