diff --git a/src/intelligence_layer/connectors/studio/studio.py b/src/intelligence_layer/connectors/studio/studio.py index d5f88671..a72ed3da 100644 --- a/src/intelligence_layer/connectors/studio/studio.py +++ b/src/intelligence_layer/connectors/studio/studio.py @@ -122,8 +122,8 @@ class PostBenchmarkExecution(BaseModel): run_end: datetime run_successful_count: int run_failed_count: int - run_success_avg_latency: int - run_success_avg_token_count: int + run_success_avg_latency: float + run_success_avg_token_count: float # Eval Overview eval_start: datetime eval_end: datetime @@ -143,7 +143,7 @@ class GetDatasetExamplesResponse(BaseModel, Generic[Input, ExpectedOutput]): items: Sequence[StudioExample[Input, ExpectedOutput]] -class BenchmarkLineage(BaseModel, Generic[Input, Output, ExpectedOutput, Evaluation]): +class BenchmarkLineage(BaseModel, Generic[Input, ExpectedOutput, Output, Evaluation]): trace_id: str input: Input expected_output: ExpectedOutput diff --git a/src/intelligence_layer/evaluation/benchmark/studio_benchmark.py b/src/intelligence_layer/evaluation/benchmark/studio_benchmark.py index d88c35e7..ca6cf80d 100644 --- a/src/intelligence_layer/evaluation/benchmark/studio_benchmark.py +++ b/src/intelligence_layer/evaluation/benchmark/studio_benchmark.py @@ -1,3 +1,4 @@ +import itertools from collections.abc import Sequence from datetime import datetime from http import HTTPStatus @@ -16,6 +17,7 @@ ) from intelligence_layer.core import Input, Output from intelligence_layer.core.task import Task +from intelligence_layer.core.tracer.tracer import ExportedSpan from intelligence_layer.evaluation.aggregation.aggregator import ( AggregationLogic, Aggregator, @@ -29,6 +31,10 @@ BenchmarkRepository, ) from intelligence_layer.evaluation.benchmark.get_code import get_source_notebook_safe +from intelligence_layer.evaluation.benchmark.trace_information import ( + extract_latency_from_trace, + extract_token_count_from_trace, +) from intelligence_layer.evaluation.dataset.domain import ExpectedOutput from intelligence_layer.evaluation.dataset.studio_dataset_repository import ( StudioDatasetRepository, @@ -118,7 +124,30 @@ def execute( end = datetime.now() - data = PostBenchmarkExecution( + evaluation_lineages = list( + self.evaluator.evaluation_lineages(evaluation_overview.id) + ) + + run_traces = [ + self._trace_from_lineage(lineage) for lineage in evaluation_lineages + ] + tokens_per_trace = [ + extract_token_count_from_trace(trace) for trace in run_traces + ] + latency_per_trace = [extract_latency_from_trace(trace) for trace in run_traces] + + tokens_per_successful_trace, latency_per_successful_trace = ( + self._filter_for_succesful_runs( + (tokens_per_trace, latency_per_trace), + source_lineage_list=evaluation_lineages, + run_id=run_overview.id, + ) + ) + + def average_or_zero(list: list) -> float: + return sum(list) / len(list) if len(list) > 0 else 0 + + benchmark_execution_data = PostBenchmarkExecution( name=name, description=description, labels=labels, @@ -129,8 +158,8 @@ def execute( run_end=run_overview.end, run_successful_count=run_overview.successful_example_count, run_failed_count=run_overview.failed_example_count, - run_success_avg_latency=0, # TODO: Implement this - run_success_avg_token_count=0, # TODO: Implement this + run_success_avg_latency=average_or_zero(latency_per_successful_trace), + run_success_avg_token_count=average_or_zero(tokens_per_successful_trace), eval_start=evaluation_overview.start_date, eval_end=evaluation_overview.end_date, eval_successful_count=evaluation_overview.successful_evaluation_count, @@ -141,23 +170,21 @@ def execute( ) benchmark_execution_id = self.client.submit_benchmark_execution( - benchmark_id=self.id, data=data + benchmark_id=self.id, data=benchmark_execution_data ) - evaluation_lineages = list( - self.evaluator.evaluation_lineages(evaluation_overview.id) - ) trace_ids = [] - for lineage in tqdm(evaluation_lineages, desc="Submitting traces to Studio"): - trace = lineage.tracers[0] - assert trace - trace_id = self.client.submit_trace(trace.export_for_viewing()) + for trace in tqdm(run_traces, desc="Submitting traces to Studio"): + trace_id = self.client.submit_trace(trace) trace_ids.append(trace_id) benchmark_lineages = self._create_benchmark_lineages( eval_lineages=evaluation_lineages, trace_ids=trace_ids, + latencies_per_trace=latency_per_trace, + tokens_per_trace=tokens_per_trace, ) + self.client.submit_benchmark_lineages( benchmark_lineages=benchmark_lineages, execution_id=benchmark_execution_id, @@ -166,22 +193,67 @@ def execute( return benchmark_execution_id + def _filter_for_succesful_runs( + self, + lists_to_filter: tuple[list, ...], + source_lineage_list: list[ + EvaluationLineage[Input, ExpectedOutput, Output, Evaluation] + ], + run_id: str, + ) -> tuple[list, ...]: + """This method assumes that lists_to_filter and source_lineage_list are all equal length.""" + failed_example_output_ids = [ + example_output.example_id + for example_output in self.run_repository.failed_example_outputs( + run_id=run_id, output_type=self.evaluator.output_type() + ) + ] + + is_successful_run = [ + lineage.example.id not in failed_example_output_ids + for lineage in source_lineage_list + ] + return tuple( + list(itertools.compress(sublist, is_successful_run)) + for sublist in lists_to_filter + ) + + def _trace_from_lineage( + self, eval_lineage: EvaluationLineage[Input, ExpectedOutput, Output, Evaluation] + ) -> Sequence[ExportedSpan]: + # since we have 1 output only in this scenario, we expected to have exactly 1 tracer + trace = eval_lineage.tracers[0] + assert trace, "eval lineages always should have at least 1 tracer" + return trace.export_for_viewing() + def _create_benchmark_lineages( self, eval_lineages: list[ EvaluationLineage[Input, ExpectedOutput, Output, Evaluation] ], trace_ids: list[str], - ) -> Sequence[BenchmarkLineage[Input, Output, ExpectedOutput, Evaluation]]: + latencies_per_trace: list[int], + tokens_per_trace: list[int], + ) -> Sequence[BenchmarkLineage[Input, ExpectedOutput, Output, Evaluation]]: return [ - self._create_benchmark_lineage(eval_lineage, trace_id) - for eval_lineage, trace_id in zip(eval_lineages, trace_ids, strict=True) + self._create_benchmark_lineage( + eval_lineage, trace_id, run_latency, run_tokens + ) + for eval_lineage, trace_id, run_latency, run_tokens in zip( + eval_lineages, + trace_ids, + latencies_per_trace, + tokens_per_trace, + strict=True, + ) ] def _create_benchmark_lineage( self, eval_lineage: EvaluationLineage[Input, ExpectedOutput, Output, Evaluation], trace_id: str, + run_latency: int, + run_tokens: int, ) -> BenchmarkLineage: return BenchmarkLineage( trace_id=trace_id, @@ -190,8 +262,8 @@ def _create_benchmark_lineage( example_metadata=eval_lineage.example.metadata, output=eval_lineage.outputs[0].output, evaluation=eval_lineage.evaluation.result, - run_latency=0, # TODO: Implement this - run_tokens=0, # TODO: Implement this + run_latency=run_latency, + run_tokens=run_tokens, ) diff --git a/src/intelligence_layer/evaluation/benchmark/trace_information.py b/src/intelligence_layer/evaluation/benchmark/trace_information.py new file mode 100644 index 00000000..a4f73eec --- /dev/null +++ b/src/intelligence_layer/evaluation/benchmark/trace_information.py @@ -0,0 +1,66 @@ +from collections.abc import Sequence +from datetime import timedelta +from typing import cast + +from aleph_alpha_client import CompletionResponse + +from intelligence_layer.core import ExportedSpan +from intelligence_layer.core.model import _Complete +from intelligence_layer.core.tracer.tracer import SpanType + + +def _get_root(trace: Sequence[ExportedSpan]) -> ExportedSpan | None: + root_spans = [span for span in trace if span.parent_id is None] + if len(root_spans) != 1: + return None + return root_spans[0] + + +def extract_latency_from_trace(trace: Sequence[ExportedSpan]) -> int: + """Extract the total duration of a given trace based on its root trace. + + Args: + trace: trace to analyze + + Returns: + The duration of the trace in microseconds + """ + root_span = _get_root(trace) + if root_span is None: + raise ValueError("No root span found in the trace") + latency = (root_span.end_time - root_span.start_time) / timedelta(microseconds=1) + return int(latency) + + +def _is_complete_request(span: ExportedSpan) -> bool: + # Assuming that LLM requests have a specific name or attribute + return span.name == _Complete.__name__ + + +def _extract_tokens_from_complete_request(span: ExportedSpan) -> int: + if not hasattr(span.attributes, "output"): + raise ValueError( + "Function expects a complete span with attributes.output. Output was not present." + ) + completion_output = cast(CompletionResponse, span.attributes.output) + return completion_output.num_tokens_generated + + +def extract_token_count_from_trace(trace: Sequence[ExportedSpan]) -> int: + """Extract the number of tokens generated in a trace based on its completion requests. + + Note: Does not support traces of streamed responses. + + Args: + trace: trace to analyze. + + Returns: + The sum of newly generated tokens across all spans in the given trace. + """ + token_count = 0 + for span in trace: + if span.attributes.type != SpanType.TASK_SPAN: + continue + if _is_complete_request(span): + token_count += _extract_tokens_from_complete_request(span) + return token_count diff --git a/tests/evaluation/benchmark/test_benchmark.py b/tests/evaluation/benchmark/test_benchmark.py index da3cb932..81afd027 100644 --- a/tests/evaluation/benchmark/test_benchmark.py +++ b/tests/evaluation/benchmark/test_benchmark.py @@ -1,5 +1,6 @@ from datetime import datetime -from typing import Optional +from typing import Optional, cast +from unittest.mock import Mock, patch from uuid import UUID, uuid4 import pytest @@ -8,7 +9,9 @@ from requests import HTTPError, Response from intelligence_layer.connectors.studio.studio import ( + BenchmarkLineage, GetBenchmarkResponse, + PostBenchmarkExecution, StudioClient, StudioExample, ) @@ -19,6 +22,7 @@ type_to_schema, ) from tests.evaluation.conftest import ( + FAIL_IN_TASK_INPUT, DummyAggregationLogic, DummyEvaluationLogic, DummyTask, @@ -140,7 +144,8 @@ def test_create_benchmark( aggregation_logic: DummyAggregationLogic, ) -> None: dataset_id = "fake_dataset_id" - mock_studio_client.submit_benchmark.return_value = str(uuid4()) # type: ignore + mock_submit_benchmark = cast(Mock, mock_studio_client.submit_benchmark) + mock_submit_benchmark.return_value = str(uuid4()) benchmark = studio_benchmark_repository.create_benchmark( dataset_id, evaluation_logic, aggregation_logic, "benchmark_name" @@ -148,7 +153,7 @@ def test_create_benchmark( uuid = UUID(benchmark.id) assert uuid assert benchmark.dataset_id == dataset_id - studio_benchmark_repository.client.submit_benchmark.assert_called_once() # type: ignore + mock_submit_benchmark.assert_called_once() def test_create_benchmark_with_non_existing_dataset( @@ -161,7 +166,7 @@ def test_create_benchmark_with_non_existing_dataset( response = Response() response.status_code = 400 - mock_studio_client.submit_benchmark.side_effect = HTTPError( # type: ignore + cast(Mock, mock_studio_client.submit_benchmark).side_effect = HTTPError( "400 Client Error: Bad Request for url", response=response ) @@ -180,7 +185,7 @@ def test_get_benchmark( datatset_id: str, ) -> None: benchmark_id = "benchmark_id" - mock_studio_client.get_benchmark.return_value = get_benchmark_response # type: ignore + cast(Mock, mock_studio_client.get_benchmark).return_value = get_benchmark_response benchmark = studio_benchmark_repository.get_benchmark( benchmark_id, evaluation_logic, aggregation_logic @@ -199,7 +204,7 @@ def test_get_non_existing_benchmark( evaluation_logic: DummyEvaluationLogic, aggregation_logic: DummyAggregationLogic, ) -> None: - mock_studio_client.get_benchmark.return_value = None # type: ignore + cast(Mock, mock_studio_client.get_benchmark).return_value = None assert ( studio_benchmark_repository.get_benchmark( @@ -209,29 +214,134 @@ def test_get_non_existing_benchmark( ) +@patch( + "intelligence_layer.evaluation.benchmark.studio_benchmark.extract_token_count_from_trace" +) def test_execute_benchmark( + mock_extract_tokens: Mock, studio_benchmark_repository: StudioBenchmarkRepository, mock_studio_client: StudioClient, evaluation_logic: DummyEvaluationLogic, get_benchmark_response: GetBenchmarkResponse, aggregation_logic: DummyAggregationLogic, - task, + task: DummyTask, ) -> None: - mock_studio_client.get_benchmark.return_value = get_benchmark_response # type: ignore - mock_studio_client.submit_trace.return_value = str(uuid4()) # type: ignore + mock_submit_trace = cast(Mock, mock_studio_client.submit_trace) + mock_submit_trace.return_value = str(uuid4()) + mock_submit_execution = cast(Mock, mock_studio_client.submit_benchmark_execution) + mock_submit_lineage = cast(Mock, mock_studio_client.submit_benchmark_lineages) + + expected_generated_tokens = 100 + mock_extract_tokens.return_value = expected_generated_tokens + + cast(Mock, mock_studio_client.get_benchmark).return_value = get_benchmark_response examples = [ StudioExample(input="input0", expected_output="expected_output0"), StudioExample(input="input1", expected_output="expected_output1"), StudioExample(input="input2", expected_output="expected_output2"), StudioExample(input="input3", expected_output="expected_output3"), ] + cast(Mock, mock_studio_client.get_dataset_examples).return_value = examples + benchmark = studio_benchmark_repository.get_benchmark( + "benchmark_id", evaluation_logic, aggregation_logic + ) + assert benchmark + + # when + benchmark.execute( + task, + name="name", + description="description", + metadata={"key": "value"}, + labels={"label"}, + ) - mock_studio_client.get_dataset_examples.return_value = examples # type: ignore + # then + mock_submit_execution.assert_called_once() + uploaded_execution = cast( + PostBenchmarkExecution, mock_submit_execution.call_args[1]["data"] + ) + assert uploaded_execution.run_success_avg_latency > 0 + assert uploaded_execution.run_success_avg_token_count == expected_generated_tokens + + assert mock_submit_trace.call_count == 4 + + mock_submit_lineage.assert_called_once() + uploaded_lineages = mock_submit_lineage.call_args[1]["benchmark_lineages"] + for lineage in uploaded_lineages: + lineage = cast(BenchmarkLineage, lineage) + assert lineage.run_latency > 0 + # this assumes that each lineage consists of traces that only have a single span + assert lineage.run_tokens == expected_generated_tokens + + +def test_execute_benchmark_on_empty_examples_uploads_example_and_calculates_correctly( + studio_benchmark_repository: StudioBenchmarkRepository, + mock_studio_client: StudioClient, + evaluation_logic: DummyEvaluationLogic, + get_benchmark_response: GetBenchmarkResponse, + aggregation_logic: DummyAggregationLogic, + task: DummyTask, +) -> None: + mock_submit_trace = cast(Mock, mock_studio_client.submit_trace) + mock_submit_execution = cast(Mock, mock_studio_client.submit_benchmark_execution) + + cast(Mock, mock_studio_client.get_benchmark).return_value = get_benchmark_response + cast(Mock, mock_studio_client.get_dataset_examples).return_value = [] benchmark = studio_benchmark_repository.get_benchmark( "benchmark_id", evaluation_logic, aggregation_logic ) + assert benchmark + # when + benchmark.execute( + task, + name="name", + description="description", + metadata={"key": "value"}, + labels={"label"}, + ) + + # then + mock_submit_execution.assert_called_once() + uploaded_execution = cast( + PostBenchmarkExecution, mock_submit_execution.call_args[1]["data"] + ) + assert uploaded_execution.run_success_avg_latency == 0 + assert uploaded_execution.run_success_avg_token_count == 0 + + assert mock_submit_trace.call_count == 0 + + +@patch( + "intelligence_layer.evaluation.benchmark.studio_benchmark.extract_token_count_from_trace" +) +def test_execute_benchmark_failing_examples_calculates_correctly( + mock_extract_tokens: Mock, + studio_benchmark_repository: StudioBenchmarkRepository, + mock_studio_client: StudioClient, + evaluation_logic: DummyEvaluationLogic, + get_benchmark_response: GetBenchmarkResponse, + aggregation_logic: DummyAggregationLogic, + task: DummyTask, +) -> None: + mock_submit_trace = cast(Mock, mock_studio_client.submit_trace) + mock_submit_execution = cast(Mock, mock_studio_client.submit_benchmark_execution) + + cast(Mock, mock_studio_client.get_benchmark).return_value = get_benchmark_response + examples = [ + StudioExample(input=FAIL_IN_TASK_INPUT, expected_output="expected_output0"), + ] + cast(Mock, mock_studio_client.get_dataset_examples).return_value = examples + benchmark = studio_benchmark_repository.get_benchmark( + "benchmark_id", evaluation_logic, aggregation_logic + ) + + expected_generated_tokens = 0 + mock_extract_tokens.return_value = expected_generated_tokens + 1 assert benchmark + + # when benchmark.execute( task, name="name", @@ -240,6 +350,13 @@ def test_execute_benchmark( labels={"label"}, ) - mock_studio_client.submit_benchmark_execution.assert_called_once() # type: ignore - assert mock_studio_client.submit_trace.call_count == 4 # type: ignore - mock_studio_client.submit_benchmark_lineages.assert_called_once() # type: ignore + # then + mock_submit_execution.assert_called_once() + uploaded_execution = cast( + PostBenchmarkExecution, mock_submit_execution.call_args[1]["data"] + ) + assert uploaded_execution.run_success_avg_latency == 0 + assert uploaded_execution.run_success_avg_token_count == expected_generated_tokens + assert uploaded_execution.run_successful_count == 0 + + assert mock_submit_trace.call_count == 0 diff --git a/tests/evaluation/benchmark/test_trace_information.py b/tests/evaluation/benchmark/test_trace_information.py new file mode 100644 index 00000000..b115448c --- /dev/null +++ b/tests/evaluation/benchmark/test_trace_information.py @@ -0,0 +1,158 @@ +from collections.abc import Mapping, Sequence +from datetime import datetime, timedelta, timezone +from typing import Any +from uuid import uuid4 + +import pytest +from aleph_alpha_client import CompletionRequest, CompletionResponse +from aleph_alpha_client.completion import CompletionResult + +from intelligence_layer.connectors.limited_concurrency_client import ( + AlephAlphaClientProtocol, +) +from intelligence_layer.core import ( + Context, + ExportedSpan, + SpanAttributes, + SpanStatus, + TaskSpanAttributes, +) +from intelligence_layer.core.model import CompleteInput, Message, Pharia1ChatModel +from intelligence_layer.core.tracer.in_memory_tracer import InMemoryTracer +from intelligence_layer.evaluation.benchmark.trace_information import ( + extract_latency_from_trace, + extract_token_count_from_trace, +) + + +@pytest.fixture +def root_span() -> ExportedSpan: + trace_id = uuid4() + return ExportedSpan( + context=Context( + trace_id=trace_id, + span_id=trace_id, + ), + name="root", + parent_id=None, + start_time=datetime(2022, 1, 1, 12, 0, 0, tzinfo=timezone.utc), + end_time=datetime(2022, 1, 1, 12, 0, 1, tzinfo=timezone.utc), + attributes=SpanAttributes(), + events=[], + status=SpanStatus.OK, + ) + + +@pytest.fixture +def child_span(root_span: ExportedSpan) -> ExportedSpan: + return ExportedSpan( + context=Context( + trace_id=root_span.context.trace_id, + span_id=uuid4(), + ), + name="child", + parent_id=root_span.context.span_id, + start_time=datetime(2022, 1, 1, 12, 0, 0, 500000, tzinfo=timezone.utc), + end_time=datetime(2022, 1, 1, 12, 0, 1, 500000, tzinfo=timezone.utc), + attributes=TaskSpanAttributes( + input="input", + output="output", + ), + events=[], + status=SpanStatus.OK, + ) + + +SECOND_IN_MICROSECOND = 10**6 + + +def test_extract_latency_from_trace_root_span(root_span: ExportedSpan): + latency = extract_latency_from_trace([root_span]) + assert latency == SECOND_IN_MICROSECOND + + +def test_extract_latency_from_trace_root_span_can_do_ns_differences( + root_span: ExportedSpan, +): + root_span.end_time = root_span.start_time + timedelta(microseconds=1) + latency = extract_latency_from_trace([root_span]) + assert latency == 1 + + +def test_extract_latency_from_trace_root_span_with_child( + root_span: ExportedSpan, child_span: ExportedSpan +): + latency = extract_latency_from_trace([root_span, child_span]) + assert latency == SECOND_IN_MICROSECOND + + latency = extract_latency_from_trace([child_span, root_span]) + assert latency == SECOND_IN_MICROSECOND + + +def test_extract_latency_from_trace_no_root_span(child_span: ExportedSpan): + # Call the function with the child span + with pytest.raises(ValueError): + extract_latency_from_trace([child_span]) + + +class MockClient(AlephAlphaClientProtocol): + def __init__(self, generated_tokens: int): + self.generated_tokens = generated_tokens + + def complete( + self, + request: CompletionRequest, + model: str, + ) -> CompletionResponse: + return CompletionResponse( + model_version="---", + completions=[CompletionResult()], + num_tokens_generated=self.generated_tokens, + num_tokens_prompt_total=20, + ) + + def models(self) -> Sequence[Mapping[str, Any]]: + return [{"name": "pharia-1-llm-7b-control"}] + + +def test_extract_token_count_from_trace_works_without_llm_spans( + root_span: ExportedSpan, +): + result = extract_token_count_from_trace([root_span]) + assert result == 0 + + +def test_extract_token_count_from_trace_works_with_complete_spans(): + tokens_to_generate = 10 + model_str = "pharia-1-llm-7b-control" + model = Pharia1ChatModel( + name=model_str, client=MockClient(generated_tokens=tokens_to_generate) + ) + tracer = InMemoryTracer() + with tracer.span("root") as root: + model.complete(CompleteInput(prompt=model.to_instruct_prompt("test")), root) + model.complete(CompleteInput(prompt=model.to_instruct_prompt("test")), root) + + complete_trace = tracer.export_for_viewing() + + result = extract_token_count_from_trace(complete_trace) + assert result == tokens_to_generate * 2 + + +def test_extract_token_count_from_trace_works_chat(): + tokens_to_generate = 10 + model_str = "pharia-1-llm-7b-control" + model = Pharia1ChatModel( + name=model_str, client=MockClient(generated_tokens=tokens_to_generate) + ) + tracer = InMemoryTracer() + with tracer.span("root") as root: + model.generate_chat( + messages=[Message(role="user", content="dummy")], + response_prefix=None, + tracer=root, + ) + complete_trace = tracer.export_for_viewing() + + result = extract_token_count_from_trace(complete_trace) + assert result == tokens_to_generate