Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: show latency and token count in benchmark execution detail view #1185

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/intelligence_layer/connectors/studio/studio.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ class PostBenchmarkExecution(BaseModel):
run_end: datetime
run_successful_count: int
run_failed_count: int
run_success_avg_latency: int
run_success_avg_token_count: int
run_success_avg_latency: float
run_success_avg_token_count: float
# Eval Overview
eval_start: datetime
eval_end: datetime
Expand All @@ -143,7 +143,7 @@ class GetDatasetExamplesResponse(BaseModel, Generic[Input, ExpectedOutput]):
items: Sequence[StudioExample[Input, ExpectedOutput]]


class BenchmarkLineage(BaseModel, Generic[Input, Output, ExpectedOutput, Evaluation]):
class BenchmarkLineage(BaseModel, Generic[Input, ExpectedOutput, Output, Evaluation]):
trace_id: str
input: Input
expected_output: ExpectedOutput
Expand Down
104 changes: 88 additions & 16 deletions src/intelligence_layer/evaluation/benchmark/studio_benchmark.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import itertools
from collections.abc import Sequence
from datetime import datetime
from http import HTTPStatus
Expand All @@ -16,6 +17,7 @@
)
from intelligence_layer.core import Input, Output
from intelligence_layer.core.task import Task
from intelligence_layer.core.tracer.tracer import ExportedSpan
from intelligence_layer.evaluation.aggregation.aggregator import (
AggregationLogic,
Aggregator,
Expand All @@ -29,6 +31,10 @@
BenchmarkRepository,
)
from intelligence_layer.evaluation.benchmark.get_code import get_source_notebook_safe
from intelligence_layer.evaluation.benchmark.trace_information import (
extract_latency_from_trace,
extract_token_count_from_trace,
)
from intelligence_layer.evaluation.dataset.domain import ExpectedOutput
from intelligence_layer.evaluation.dataset.studio_dataset_repository import (
StudioDatasetRepository,
Expand Down Expand Up @@ -118,7 +124,30 @@ def execute(

end = datetime.now()

data = PostBenchmarkExecution(
evaluation_lineages = list(
self.evaluator.evaluation_lineages(evaluation_overview.id)
)

run_traces = [
self._trace_from_lineage(lineage) for lineage in evaluation_lineages
]
tokens_per_trace = [
extract_token_count_from_trace(trace) for trace in run_traces
]
latency_per_trace = [extract_latency_from_trace(trace) for trace in run_traces]

tokens_per_successful_trace, latency_per_successful_trace = (
self._filter_for_succesful_runs(
(tokens_per_trace, latency_per_trace),
source_lineage_list=evaluation_lineages,
run_id=run_overview.id,
)
)

def average_or_zero(list: list) -> float:
return sum(list) / len(list) if len(list) > 0 else 0

benchmark_execution_data = PostBenchmarkExecution(
name=name,
description=description,
labels=labels,
Expand All @@ -129,8 +158,8 @@ def execute(
run_end=run_overview.end,
run_successful_count=run_overview.successful_example_count,
run_failed_count=run_overview.failed_example_count,
run_success_avg_latency=0, # TODO: Implement this
run_success_avg_token_count=0, # TODO: Implement this
run_success_avg_latency=average_or_zero(latency_per_successful_trace),
run_success_avg_token_count=average_or_zero(tokens_per_successful_trace),
eval_start=evaluation_overview.start_date,
eval_end=evaluation_overview.end_date,
eval_successful_count=evaluation_overview.successful_evaluation_count,
Expand All @@ -141,23 +170,21 @@ def execute(
)

benchmark_execution_id = self.client.submit_benchmark_execution(
benchmark_id=self.id, data=data
benchmark_id=self.id, data=benchmark_execution_data
)

evaluation_lineages = list(
self.evaluator.evaluation_lineages(evaluation_overview.id)
)
trace_ids = []
for lineage in tqdm(evaluation_lineages, desc="Submitting traces to Studio"):
trace = lineage.tracers[0]
assert trace
trace_id = self.client.submit_trace(trace.export_for_viewing())
for trace in tqdm(run_traces, desc="Submitting traces to Studio"):
trace_id = self.client.submit_trace(trace)
trace_ids.append(trace_id)

benchmark_lineages = self._create_benchmark_lineages(
MerlinKallenbornAA marked this conversation as resolved.
Show resolved Hide resolved
eval_lineages=evaluation_lineages,
trace_ids=trace_ids,
latencies_per_trace=latency_per_trace,
tokens_per_trace=tokens_per_trace,
)

self.client.submit_benchmark_lineages(
benchmark_lineages=benchmark_lineages,
execution_id=benchmark_execution_id,
Expand All @@ -166,22 +193,67 @@ def execute(

return benchmark_execution_id

def _filter_for_succesful_runs(
self,
lists_to_filter: tuple[list, ...],
source_lineage_list: list[
EvaluationLineage[Input, ExpectedOutput, Output, Evaluation]
],
run_id: str,
) -> tuple[list, ...]:
"""This method assumes that lists_to_filter and source_lineage_list are all equal length."""
failed_example_output_ids = [
example_output.example_id
for example_output in self.run_repository.failed_example_outputs(
run_id=run_id, output_type=self.evaluator.output_type()
)
]

is_successful_run = [
lineage.example.id not in failed_example_output_ids
for lineage in source_lineage_list
]
return tuple(
list(itertools.compress(sublist, is_successful_run))
for sublist in lists_to_filter
)

def _trace_from_lineage(
self, eval_lineage: EvaluationLineage[Input, ExpectedOutput, Output, Evaluation]
) -> Sequence[ExportedSpan]:
# since we have 1 output only in this scenario, we expected to have exactly 1 tracer
trace = eval_lineage.tracers[0]
assert trace, "eval lineages always should have at least 1 tracer"
return trace.export_for_viewing()

def _create_benchmark_lineages(
self,
eval_lineages: list[
EvaluationLineage[Input, ExpectedOutput, Output, Evaluation]
],
trace_ids: list[str],
) -> Sequence[BenchmarkLineage[Input, Output, ExpectedOutput, Evaluation]]:
latencies_per_trace: list[int],
tokens_per_trace: list[int],
) -> Sequence[BenchmarkLineage[Input, ExpectedOutput, Output, Evaluation]]:
return [
self._create_benchmark_lineage(eval_lineage, trace_id)
for eval_lineage, trace_id in zip(eval_lineages, trace_ids, strict=True)
self._create_benchmark_lineage(
eval_lineage, trace_id, run_latency, run_tokens
)
for eval_lineage, trace_id, run_latency, run_tokens in zip(
eval_lineages,
trace_ids,
latencies_per_trace,
tokens_per_trace,
strict=True,
)
]

def _create_benchmark_lineage(
self,
eval_lineage: EvaluationLineage[Input, ExpectedOutput, Output, Evaluation],
trace_id: str,
run_latency: int,
run_tokens: int,
) -> BenchmarkLineage:
return BenchmarkLineage(
trace_id=trace_id,
Expand All @@ -190,8 +262,8 @@ def _create_benchmark_lineage(
example_metadata=eval_lineage.example.metadata,
output=eval_lineage.outputs[0].output,
evaluation=eval_lineage.evaluation.result,
run_latency=0, # TODO: Implement this
run_tokens=0, # TODO: Implement this
run_latency=run_latency,
run_tokens=run_tokens,
)


Expand Down
66 changes: 66 additions & 0 deletions src/intelligence_layer/evaluation/benchmark/trace_information.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from collections.abc import Sequence
from datetime import timedelta
from typing import cast

from aleph_alpha_client import CompletionResponse

from intelligence_layer.core import ExportedSpan
from intelligence_layer.core.model import _Complete
from intelligence_layer.core.tracer.tracer import SpanType


def _get_root(trace: Sequence[ExportedSpan]) -> ExportedSpan | None:
root_spans = [span for span in trace if span.parent_id is None]
if len(root_spans) != 1:
return None
return root_spans[0]


def extract_latency_from_trace(trace: Sequence[ExportedSpan]) -> int:
"""Extract the total duration of a given trace based on its root trace.

Args:
trace: trace to analyze

Returns:
The duration of the trace in microseconds
"""
root_span = _get_root(trace)
if root_span is None:
raise ValueError("No root span found in the trace")
latency = (root_span.end_time - root_span.start_time) / timedelta(microseconds=1)
return int(latency)


def _is_complete_request(span: ExportedSpan) -> bool:
# Assuming that LLM requests have a specific name or attribute
return span.name == _Complete.__name__


def _extract_tokens_from_complete_request(span: ExportedSpan) -> int:
if not hasattr(span.attributes, "output"):
raise ValueError(
"Function expects a complete span with attributes.output. Output was not present."
)
completion_output = cast(CompletionResponse, span.attributes.output)
return completion_output.num_tokens_generated


def extract_token_count_from_trace(trace: Sequence[ExportedSpan]) -> int:
"""Extract the number of tokens generated in a trace based on its completion requests.

Note: Does not support traces of streamed responses.

Args:
trace: trace to analyze.

Returns:
The sum of newly generated tokens across all spans in the given trace.
"""
token_count = 0
for span in trace:
if span.attributes.type != SpanType.TASK_SPAN:
continue
if _is_complete_request(span):
token_count += _extract_tokens_from_complete_request(span)
return token_count
Loading
Loading