Skip to content

Commit

Permalink
test(tracer): Add OpenTelemetryTracer test setup
Browse files Browse the repository at this point in the history
TASK: IL-293
  • Loading branch information
MerlinKallenbornTNG authored and FlorianSchepersAA committed Mar 27, 2024
1 parent 85eed4e commit 8794244
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 27 deletions.
8 changes: 8 additions & 0 deletions .github/workflows/github-actions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,14 @@ jobs:
- "6900:6900"
env:
ARGILLA_ELASTICSEARCH: "http://argilla-elastic-search:9200"
open-telemetry-trace-service:
env:
COLLECTOR_OTLP_ENABLED: "true"
ports:
- "4317:4317"
- "4318:4318"
- "16686:16686"
image: jaegertracing/all-in-one:1.35

steps:
- name: Checkout repository
Expand Down
9 changes: 9 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,12 @@ services:
- "6900:6900"
environment:
ARGILLA_ELASTICSEARCH: "http://argilla-elastic-search:9200"
open-telemetry-trace-service:
container_name: jaeger_1_35
environment:
COLLECTOR_OTLP_ENABLED: "true"
ports:
- "4317:4317"
- "4318:4318"
- "16686:16686"
image: jaegertracing/all-in-one:1.35
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pycountry = "23.12.11"
rouge = "^1.0.1"
opentelemetry-api = "^1.22.0"
opentelemetry-sdk = "^1.22.0"
opentelemetry-exporter-otlp-proto-http = "1.23.0"
huggingface-hub = "^0.21.4"

[tool.poetry.group.dev.dependencies]
Expand Down
138 changes: 111 additions & 27 deletions tests/core/test_tracer.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
import json
import time
from pathlib import Path
from time import sleep
from typing import Any, Optional
from unittest.mock import Mock

import pytest
import requests
from aleph_alpha_client import Prompt
from opentelemetry.trace import get_tracer
from pytest import fixture, mark
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from pytest import fixture

from intelligence_layer.core import (
CompleteInput,
Expand All @@ -31,6 +40,19 @@ def complete(
return luminous_control_model.complete_task()


@fixture
def open_telemetry_tracer() -> tuple[str, OpenTelemetryTracer]:
service_name = "test-service"
url = "http://localhost:16686/api/traces?limit=-1&service=" + service_name
resource = Resource.create({SERVICE_NAME: service_name})
provider = TracerProvider(resource=resource)
trace.set_tracer_provider(provider)
processor = BatchSpanProcessor(OTLPSpanExporter())
provider.add_span_processor(processor)
openTracer = OpenTelemetryTracer(trace.get_tracer("intelligence-layer"))
return (url, openTracer)


def test_composite_tracer_id_consistent_across_children(
file_tracer: FileTracer,
) -> None:
Expand Down Expand Up @@ -167,14 +189,22 @@ class TestTask(Task[str, str]):
sub_task = TestSubTask()

def do_run(self, input: str, task_span: TaskSpan) -> str:
with task_span.span("span") as sub_span:
sub_span.log("message", "a value")
with task_span.span("SubSpan1") as sub_span:
sub_span.log("This is a sub span", "subspan value")
self.sub_task.run(None, sub_span)
self.sub_task.run(None, task_span)

return "output"


class OpentTelTestTask(Task[str, str]):

def do_run(self, input: str, task_span: TaskSpan) -> str:
with task_span.span("SubSpan1") as sub_span:
sub_span.log("This is a sub span", "subspan value")
return "output"


@fixture
def file_tracer(tmp_path: Path) -> FileTracer:
return FileTracer(tmp_path / "log.log")
Expand Down Expand Up @@ -225,26 +255,80 @@ def test_file_tracer_raises_non_log_entry_failed_exceptions(
)


@mark.skip(
"Does not assert anything, here to show how you can use the OpenTelemetry Tracer."
)
def test_open_telemetry_tracer() -> None:
from opentelemetry import trace
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter

# Service name is required for most backends,
# and although it's not necessary for console export,
# it's good to set service name anyways.
resource = Resource(attributes={SERVICE_NAME: "your-service-name"})

provider = TracerProvider(resource=resource)
processor = BatchSpanProcessor(ConsoleSpanExporter())
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)

input = "input"
openTracer = get_tracer("intelligence-layer")
TestTask().run(input, CompositeTracer([OpenTelemetryTracer(openTracer)]))
provider.force_flush()
def test_open_telemetry_tracer_check_consistency_in_trace_ids(
open_telemetry_tracer: tuple[str, OpenTelemetryTracer]
) -> None:
tracing_service, tracer = open_telemetry_tracer
expected_trace_id = tracer.ensure_id(None)
TestTask().run("test-input", tracer, trace_id=expected_trace_id)
trace = _get_trace_by_id(tracing_service, expected_trace_id)

assert trace is not None
assert _get_trace_id_from_trace(trace) == expected_trace_id
spans = trace["spans"]
assert len(spans) == 4
for span in spans:
assert _get_trace_id_from_span(span) == expected_trace_id


def test_open_telemetry_tracer_loggs_input_and_output(
open_telemetry_tracer: tuple[str, OpenTelemetryTracer],
complete: Task[CompleteInput, CompleteOutput],
) -> None:
tracing_service, tracer = open_telemetry_tracer
input = CompleteInput(prompt=Prompt.from_text("test"))
trace_id = tracer.ensure_id(None)
complete.run(input, tracer, trace_id)
trace = _get_trace_by_id(tracing_service, trace_id)
assert trace is not None
spans = trace["spans"]
assert spans is not []
task_span = next((span for span in spans if span["references"] == []), None)
assert task_span is not None
tags = task_span["tags"]
open_tel_input_tag = [tag for tag in tags if tag["key"] == "input"]
assert len(open_tel_input_tag) == 1
open_tel_output_tag = [tag for tag in tags if tag["key"] == "output"]
assert len(open_tel_output_tag) == 1


def _get_trace_by_id(tracing_service: str, wanted_trace_id: str) -> Optional[Any]:
request_timeout_in_seconds = 10
traces = _get_current_traces(tracing_service)
if traces:
for current_trace in traces:
trace_id = _get_trace_id_from_trace(current_trace)
if trace_id == wanted_trace_id:
return trace

request_start = time.time()
while time.time() - request_start < request_timeout_in_seconds:
traces = _get_current_traces(tracing_service)
if traces:
for current_trace in traces:
trace_id = _get_trace_id_from_trace(current_trace)
if trace_id == wanted_trace_id:
return current_trace
sleep(0.1)
return None


def _get_current_traces(tracing_service: str) -> Any:
response = requests.get(tracing_service)
response_text = json.loads(response.text)
return response_text["data"]


def _get_trace_id_from_trace(trace: Any) -> Optional[str]:
spans = trace["spans"]
if not spans:
return None
return _get_trace_id_from_span(spans[0])


def _get_trace_id_from_span(span: Any) -> Optional[str]:
tags = span["tags"]
if not tags:
return None
trace_id_tag = next(tag for tag in tags if tag["key"] == "trace_id")
return str(trace_id_tag["value"])

0 comments on commit 8794244

Please sign in to comment.