From 8794244cada92f5b4793c60da38d2163db6967a8 Mon Sep 17 00:00:00 2001 From: Merlin Kallenborn Date: Wed, 13 Mar 2024 15:43:19 +0100 Subject: [PATCH] test(tracer): Add OpenTelemetryTracer test setup TASK: IL-293 --- .github/workflows/github-actions.yml | 8 ++ docker-compose.yaml | 9 ++ pyproject.toml | 1 + tests/core/test_tracer.py | 138 +++++++++++++++++++++------ 4 files changed, 129 insertions(+), 27 deletions(-) diff --git a/.github/workflows/github-actions.yml b/.github/workflows/github-actions.yml index f5143b69c..8bdddb7f2 100644 --- a/.github/workflows/github-actions.yml +++ b/.github/workflows/github-actions.yml @@ -111,6 +111,14 @@ jobs: - "6900:6900" env: ARGILLA_ELASTICSEARCH: "http://argilla-elastic-search:9200" + open-telemetry-trace-service: + env: + COLLECTOR_OTLP_ENABLED: "true" + ports: + - "4317:4317" + - "4318:4318" + - "16686:16686" + image: jaegertracing/all-in-one:1.35 steps: - name: Checkout repository diff --git a/docker-compose.yaml b/docker-compose.yaml index 9f73ffc23..2edadc074 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -12,3 +12,12 @@ services: - "6900:6900" environment: ARGILLA_ELASTICSEARCH: "http://argilla-elastic-search:9200" + open-telemetry-trace-service: + container_name: jaeger_1_35 + environment: + COLLECTOR_OTLP_ENABLED: "true" + ports: + - "4317:4317" + - "4318:4318" + - "16686:16686" + image: jaegertracing/all-in-one:1.35 diff --git a/pyproject.toml b/pyproject.toml index c8a4cf951..c2f6d7cea 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,6 +32,7 @@ pycountry = "23.12.11" rouge = "^1.0.1" opentelemetry-api = "^1.22.0" opentelemetry-sdk = "^1.22.0" +opentelemetry-exporter-otlp-proto-http = "1.23.0" huggingface-hub = "^0.21.4" [tool.poetry.group.dev.dependencies] diff --git a/tests/core/test_tracer.py b/tests/core/test_tracer.py index 7c9a319d0..22c1f30eb 100644 --- a/tests/core/test_tracer.py +++ b/tests/core/test_tracer.py @@ -1,10 +1,19 @@ +import json +import time from pathlib import Path +from time import sleep +from typing import Any, Optional from unittest.mock import Mock import pytest +import requests from aleph_alpha_client import Prompt -from opentelemetry.trace import get_tracer -from pytest import fixture, mark +from opentelemetry import trace +from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter +from opentelemetry.sdk.resources import SERVICE_NAME, Resource +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from pytest import fixture from intelligence_layer.core import ( CompleteInput, @@ -31,6 +40,19 @@ def complete( return luminous_control_model.complete_task() +@fixture +def open_telemetry_tracer() -> tuple[str, OpenTelemetryTracer]: + service_name = "test-service" + url = "http://localhost:16686/api/traces?limit=-1&service=" + service_name + resource = Resource.create({SERVICE_NAME: service_name}) + provider = TracerProvider(resource=resource) + trace.set_tracer_provider(provider) + processor = BatchSpanProcessor(OTLPSpanExporter()) + provider.add_span_processor(processor) + openTracer = OpenTelemetryTracer(trace.get_tracer("intelligence-layer")) + return (url, openTracer) + + def test_composite_tracer_id_consistent_across_children( file_tracer: FileTracer, ) -> None: @@ -167,14 +189,22 @@ class TestTask(Task[str, str]): sub_task = TestSubTask() def do_run(self, input: str, task_span: TaskSpan) -> str: - with task_span.span("span") as sub_span: - sub_span.log("message", "a value") + with task_span.span("SubSpan1") as sub_span: + sub_span.log("This is a sub span", "subspan value") self.sub_task.run(None, sub_span) self.sub_task.run(None, task_span) return "output" +class OpentTelTestTask(Task[str, str]): + + def do_run(self, input: str, task_span: TaskSpan) -> str: + with task_span.span("SubSpan1") as sub_span: + sub_span.log("This is a sub span", "subspan value") + return "output" + + @fixture def file_tracer(tmp_path: Path) -> FileTracer: return FileTracer(tmp_path / "log.log") @@ -225,26 +255,80 @@ def test_file_tracer_raises_non_log_entry_failed_exceptions( ) -@mark.skip( - "Does not assert anything, here to show how you can use the OpenTelemetry Tracer." -) -def test_open_telemetry_tracer() -> None: - from opentelemetry import trace - from opentelemetry.sdk.resources import SERVICE_NAME, Resource - from opentelemetry.sdk.trace import TracerProvider - from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter - - # Service name is required for most backends, - # and although it's not necessary for console export, - # it's good to set service name anyways. - resource = Resource(attributes={SERVICE_NAME: "your-service-name"}) - - provider = TracerProvider(resource=resource) - processor = BatchSpanProcessor(ConsoleSpanExporter()) - provider.add_span_processor(processor) - trace.set_tracer_provider(provider) - - input = "input" - openTracer = get_tracer("intelligence-layer") - TestTask().run(input, CompositeTracer([OpenTelemetryTracer(openTracer)])) - provider.force_flush() +def test_open_telemetry_tracer_check_consistency_in_trace_ids( + open_telemetry_tracer: tuple[str, OpenTelemetryTracer] +) -> None: + tracing_service, tracer = open_telemetry_tracer + expected_trace_id = tracer.ensure_id(None) + TestTask().run("test-input", tracer, trace_id=expected_trace_id) + trace = _get_trace_by_id(tracing_service, expected_trace_id) + + assert trace is not None + assert _get_trace_id_from_trace(trace) == expected_trace_id + spans = trace["spans"] + assert len(spans) == 4 + for span in spans: + assert _get_trace_id_from_span(span) == expected_trace_id + + +def test_open_telemetry_tracer_loggs_input_and_output( + open_telemetry_tracer: tuple[str, OpenTelemetryTracer], + complete: Task[CompleteInput, CompleteOutput], +) -> None: + tracing_service, tracer = open_telemetry_tracer + input = CompleteInput(prompt=Prompt.from_text("test")) + trace_id = tracer.ensure_id(None) + complete.run(input, tracer, trace_id) + trace = _get_trace_by_id(tracing_service, trace_id) + assert trace is not None + spans = trace["spans"] + assert spans is not [] + task_span = next((span for span in spans if span["references"] == []), None) + assert task_span is not None + tags = task_span["tags"] + open_tel_input_tag = [tag for tag in tags if tag["key"] == "input"] + assert len(open_tel_input_tag) == 1 + open_tel_output_tag = [tag for tag in tags if tag["key"] == "output"] + assert len(open_tel_output_tag) == 1 + + +def _get_trace_by_id(tracing_service: str, wanted_trace_id: str) -> Optional[Any]: + request_timeout_in_seconds = 10 + traces = _get_current_traces(tracing_service) + if traces: + for current_trace in traces: + trace_id = _get_trace_id_from_trace(current_trace) + if trace_id == wanted_trace_id: + return trace + + request_start = time.time() + while time.time() - request_start < request_timeout_in_seconds: + traces = _get_current_traces(tracing_service) + if traces: + for current_trace in traces: + trace_id = _get_trace_id_from_trace(current_trace) + if trace_id == wanted_trace_id: + return current_trace + sleep(0.1) + return None + + +def _get_current_traces(tracing_service: str) -> Any: + response = requests.get(tracing_service) + response_text = json.loads(response.text) + return response_text["data"] + + +def _get_trace_id_from_trace(trace: Any) -> Optional[str]: + spans = trace["spans"] + if not spans: + return None + return _get_trace_id_from_span(spans[0]) + + +def _get_trace_id_from_span(span: Any) -> Optional[str]: + tags = span["tags"] + if not tags: + return None + trace_id_tag = next(tag for tag in tags if tag["key"] == "trace_id") + return str(trace_id_tag["value"])