Skip to content

Commit

Permalink
Il 405 warn on failed task (#690)
Browse files Browse the repository at this point in the history
The user gets informed when the run/evaluation pipeline crashes for certain examples.
* Information is printed on failed runs and evaluations.
* The stack trace is stored in the `FailedExampleRun`/`FailedExampleEvaluation` object
* The `Runner` and `Evaluator` have an optional flag `abort_on_error` to stop running/evaluating when an error occurs.
* feat: IL-405 added `Runner.failed_runs(..)`, `RunRepository.failed_example_outputs(..)`
* feat: Add evaluator.failed_evaluations

---------

Co-authored-by: FelixFehse <[email protected]>
  • Loading branch information
NiklasKoehneckeAA and FelixFehseTNG authored Apr 3, 2024
1 parent 65f4052 commit c75834a
Show file tree
Hide file tree
Showing 10 changed files with 265 additions and 54 deletions.
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
# Changelog

## 0.8.0

### Breaking Changes

### New Features
- feature: Error information is printed to the console on failed runs and evaluations.
- feature: The stack trace of a failed run/evaluation is included in the `FailedExampleRun`/`FailedExampleEvaluation` object
- feature: The `Runner.run_dataset` and `Evaluator.evaluate_run` have an optional flag `abort_on_error` to stop running/evaluating when an error occurs.
- feature: Added `Runner.failed_runs` and `Evaluator.failed_evaluations` to retrieve all failed run / evaluation lineages
- feature: Added `.successful_example_outputs` and `.failed_example_outputs` to `RunRepository` to match the evaluation repository

### Fixes

## 0.7.0

### Breaking Changes
Expand Down
3 changes: 2 additions & 1 deletion src/intelligence_layer/evaluation/evaluation/domain.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import traceback
from datetime import datetime
from typing import Generic, Optional, TypeVar

Expand All @@ -22,7 +23,7 @@ class FailedExampleEvaluation(BaseModel):
@staticmethod
def from_exception(exception: Exception) -> "FailedExampleEvaluation":
return FailedExampleEvaluation(
error_message=f"{type(exception)}: {str(exception)}"
error_message=f"{type(exception)}: {str(exception)}\n{traceback.format_exc()}"
)


Expand Down
69 changes: 45 additions & 24 deletions src/intelligence_layer/evaluation/evaluation/evaluator.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import typing
from abc import ABC, abstractmethod
from concurrent.futures import ThreadPoolExecutor
from functools import lru_cache
Expand Down Expand Up @@ -222,7 +223,10 @@ def evaluation_type(self) -> type[Evaluation]:

@final
def evaluate_runs(
self, *run_ids: str, num_examples: Optional[int] = None
self,
*run_ids: str,
num_examples: Optional[int] = None,
abort_on_error: bool = False,
) -> EvaluationOverview:
"""Evaluates all generated outputs in the run.
Expand All @@ -239,6 +243,7 @@ def evaluate_runs(
specific evaluation. The method compares all run of the provided ids to each other.
num_examples: The number of examples which should be evaluated from the given runs.
Always the first n runs stored in the evaluation repository
abort_on_error: Flag to abort all evaluations when an error occurs. Defaults to False.
Returns:
EvaluationOverview: An overview of the evaluation. Individual :class:`Evaluation`s will not be
Expand Down Expand Up @@ -293,7 +298,7 @@ def generate_evaluation_inputs() -> Iterable[
current_example = 0
for example_outputs in examples_zipped:
successful_example_outputs = [
output
typing.cast(SuccessfulExampleOutput[Output], output)
for output in example_outputs
if not isinstance(output.output, FailedExampleRun)
]
Expand All @@ -320,31 +325,19 @@ def generate_evaluation_inputs() -> Iterable[
yield (
example,
eval_id,
[
SuccessfulExampleOutput(
run_id=example_output.run_id,
example_id=example_output.example_id,
output=example_output.output,
)
for example_output in successful_example_outputs
if not isinstance(example_output.output, FailedExampleRun)
],
successful_example_outputs,
)

def evaluate(
args: Tuple[
Example[Input, ExpectedOutput],
str,
Sequence[SuccessfulExampleOutput[Output]],
],
) -> None:
example, eval_id, example_outputs = args
self.evaluate(example, eval_id, *example_outputs)

with ThreadPoolExecutor(max_workers=10) as executor:
tqdm(
executor.map(evaluate, generate_evaluation_inputs()),
desc="Evaluating",
list( # the list is needed to consume the iterator returned from the executor.map
tqdm(
executor.map(
lambda args: self.evaluate(
args[0], args[1], abort_on_error, *args[2]
),
generate_evaluation_inputs(),
)
)
)

partial_overview = EvaluationOverview(
Expand All @@ -362,6 +355,7 @@ def evaluate(
self,
example: Example[Input, ExpectedOutput],
evaluation_id: str,
abort_on_error: bool,
*example_outputs: SuccessfulExampleOutput[Output],
) -> None:
try:
Expand All @@ -372,13 +366,40 @@ def evaluate(
)
)
except Exception as e:
if abort_on_error:
raise e
print(
f'FAILED EVALUATION: example "{example.id}", {type(e).__qualname__}: "{e}"'
)
result = FailedExampleEvaluation.from_exception(e)
self._evaluation_repository.store_example_evaluation(
ExampleEvaluation(
evaluation_id=evaluation_id, example_id=example.id, result=result
)
)

def failed_evaluations(
self, evaluation_id: str
) -> Iterable[EvaluationLineage[Input, ExpectedOutput, Output, Evaluation]]:
"""Returns the `EvaluationLineage` objects for all failed example evalations that belong to the given evaluation ID.
Args:
evaluation_id: The ID of the evaluation overview
Returns:
:class:`Iterable` of :class:`EvaluationLineage`s.
"""
failed_example_evaluations = (
self._evaluation_repository.failed_example_evaluations(
evaluation_id, evaluation_type=self.evaluation_type()
)
)
lineages = (
self.evaluation_lineage(evaluation_id, output.example_id)
for output in failed_example_evaluations
)
return (lineage for lineage in lineages if lineage is not None)

def evaluation_lineages(
self, evaluation_id: str
) -> Iterable[EvaluationLineage[Input, ExpectedOutput, Output, Evaluation]]:
Expand Down
5 changes: 4 additions & 1 deletion src/intelligence_layer/evaluation/run/domain.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import traceback
from datetime import datetime
from typing import Generic

Expand All @@ -18,7 +19,9 @@ class FailedExampleRun(BaseModel):

@staticmethod
def from_exception(exception: Exception) -> "FailedExampleRun":
return FailedExampleRun(error_message=f"{type(exception)}: {str(exception)}")
return FailedExampleRun(
error_message=f"{type(exception)}: {str(exception)}\n{traceback.format_exc()}"
)


class ExampleOutput(BaseModel, Generic[Output]):
Expand Down
36 changes: 35 additions & 1 deletion src/intelligence_layer/evaluation/run/run_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@
from typing import Iterable, Optional, Sequence

from intelligence_layer.core import Output, Tracer
from intelligence_layer.evaluation.run.domain import ExampleOutput, RunOverview
from intelligence_layer.evaluation.run.domain import (
ExampleOutput,
FailedExampleRun,
RunOverview,
)
from intelligence_layer.evaluation.run.trace import ExampleTrace


Expand Down Expand Up @@ -132,3 +136,33 @@ def example_output_ids(self, run_id: str) -> Sequence[str]:
A :class:`Sequence` of all :class:`ExampleOutput` IDs.
"""
...

def successful_example_outputs(
self, run_id: str, output_type: type[Output]
) -> Iterable[ExampleOutput[Output]]:
"""Returns all :class:`ExampleOutput` for successful example runs with a given run-overview ID sorted by their example ID.
Args:
run_id: The ID of the run overview.
output_type: Type of output that the `Task` returned in :func:`Task.do_run`
Returns:
:class:`Iterable` of :class:`ExampleOutput`s.
"""
results = self.example_outputs(run_id, output_type)
return (r for r in results if not isinstance(r.output, FailedExampleRun))

def failed_example_outputs(
self, run_id: str, output_type: type[Output]
) -> Iterable[ExampleOutput[Output]]:
"""Returns all :class:`ExampleOutput` for failed example runs with a given run-overview ID sorted by their example ID.
Args:
run_id: The ID of the run overview.
output_type: Type of output that the `Task` returned in :func:`Task.do_run`
Returns:
:class:`Iterable` of :class:`ExampleOutput`s.
"""
results = self.example_outputs(run_id, output_type)
return (r for r in results if isinstance(r.output, FailedExampleRun))
29 changes: 28 additions & 1 deletion src/intelligence_layer/evaluation/run/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def run_dataset(
dataset_id: str,
tracer: Optional[Tracer] = None,
num_examples: Optional[int] = None,
abort_on_error: bool = False,
) -> RunOverview:
"""Generates all outputs for the provided dataset.
Expand All @@ -86,6 +87,7 @@ def run_dataset(
tracer: An optional :class:`Tracer` to trace all the runs from each example
num_examples: An optional int to specify how many examples from the dataset should be run.
Always the first n examples will be taken.
abort_on_error: Flag to abort all run when an error occurs. Defaults to False.
Returns:
An overview of the run. Outputs will not be returned but instead stored in the
Expand All @@ -101,7 +103,11 @@ def run(
try:
return example.id, self._task.run(example.input, evaluate_tracer)
except Exception as e:
print(e)
if abort_on_error:
raise e
print(
f'FAILED RUN: example "{example.id}", {type(e).__qualname__}: "{e}"'
)
return example.id, FailedExampleRun.from_exception(e)

# mypy does not like union types
Expand Down Expand Up @@ -144,6 +150,27 @@ def run(
self._run_repository.store_run_overview(run_overview)
return run_overview

def failed_runs(
self, run_id: str, expected_output_type: type[ExpectedOutput]
) -> Iterable[RunLineage[Input, ExpectedOutput, Output]]:
"""Returns the `RunLineage` objects for all failed example runs that belong to the given run ID.
Args:
run_id: The ID of the run overview
expected_output_type: Type of output that the `Task` returned in :func:`Task.do_run`
Returns:
:class:`Iterable` of :class:`RunLineage`s.
"""
failed_example_outputs = self._run_repository.failed_example_outputs(
run_id, output_type=self.output_type()
)
lineages = (
self.run_lineage(run_id, output.example_id, expected_output_type)
for output in failed_example_outputs
)
return (lineage for lineage in lineages if lineage is not None)

def run_lineages(
self,
run_id: str,
Expand Down
9 changes: 9 additions & 0 deletions tests/evaluation/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,15 @@ class DummyAggregatedEvaluationWithResultList(BaseModel):
results: Sequence[DummyEvaluation]


@fixture
def sequence_examples() -> Iterable[Example[str, None]]:
return [
Example(input="success", expected_output=None, id="example-1"),
Example(input=FAIL_IN_TASK_INPUT, expected_output=None, id="example-2"),
Example(input=FAIL_IN_EVAL_INPUT, expected_output=None, id="example-3"),
]


@fixture
def evaluation_id() -> str:
return "evaluation-id-1"
Expand Down
Loading

0 comments on commit c75834a

Please sign in to comment.