Skip to content

Commit

Permalink
test(tracer): Add OpenTelemetryTracer test setup
Browse files Browse the repository at this point in the history
TASK: IL-293
  • Loading branch information
MerlinKallenbornTNG authored and FlorianSchepersAA committed Mar 26, 2024
1 parent 85eed4e commit 32bc6ba
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 23 deletions.
10 changes: 10 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,13 @@ services:
- "6900:6900"
environment:
ARGILLA_ELASTICSEARCH: "http://argilla-elastic-search:9200"
all-in-one: #TODO rename
container_name: jaeger_1_35
environment:
COLLECTOR_OTLP_ENABLED: "true"

ports:
- "4317:4317"
- "4318:4318"
- "16686:16686"
image: jaegertracing/all-in-one:1.35
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pycountry = "23.12.11"
rouge = "^1.0.1"
opentelemetry-api = "^1.22.0"
opentelemetry-sdk = "^1.22.0"
opentelemetry-exporter-otlp-proto-http = "1.23.0"
huggingface-hub = "^0.21.4"

[tool.poetry.group.dev.dependencies]
Expand Down
122 changes: 99 additions & 23 deletions tests/core/test_tracer.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
import json
import time
from pathlib import Path
from time import sleep
from typing import Any
from unittest.mock import Mock

import pytest
from aleph_alpha_client import Prompt
import requests
from aleph_alpha_client import Prompt, Text
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.trace import get_tracer
from pytest import fixture, mark

Expand Down Expand Up @@ -31,6 +41,18 @@ def complete(
return luminous_control_model.complete_task()


def basic_open_telemetry_tracer(service_name: str)->tuple[str, OpenTelemetryTracer, TracerProvider]:
url = "http://localhost:16686/api/traces?limit=1&service=" + service_name
resource = Resource.create({SERVICE_NAME: service_name})
provider = TracerProvider(resource=resource)
trace.set_tracer_provider(provider)
processor = BatchSpanProcessor(OTLPSpanExporter())
provider.add_span_processor(processor)
openTracer = trace.get_tracer("intelligence-layer")
tracer = OpenTelemetryTracer(openTracer)
return (url, tracer, provider)


def test_composite_tracer_id_consistent_across_children(
file_tracer: FileTracer,
) -> None:
Expand Down Expand Up @@ -167,14 +189,22 @@ class TestTask(Task[str, str]):
sub_task = TestSubTask()

def do_run(self, input: str, task_span: TaskSpan) -> str:
with task_span.span("span") as sub_span:
sub_span.log("message", "a value")
with task_span.span("SubSpan1") as sub_span:
sub_span.log("This is a sub span", "subspan value")
self.sub_task.run(None, sub_span)
self.sub_task.run(None, task_span)

return "output"


class OpentTelTestTask(Task[str, str]):

def do_run(self, input: str, task_span: TaskSpan) -> str:
with task_span.span("SubSpan1") as sub_span:
sub_span.log("This is a sub span", "subspan value")
return "output"


@fixture
def file_tracer(tmp_path: Path) -> FileTracer:
return FileTracer(tmp_path / "log.log")
Expand Down Expand Up @@ -225,26 +255,72 @@ def test_file_tracer_raises_non_log_entry_failed_exceptions(
)


@mark.skip(
"Does not assert anything, here to show how you can use the OpenTelemetry Tracer."
)
def test_open_telemetry_tracer() -> None:
from opentelemetry import trace
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
#########################################################################################################
def test_open_telemetry_tracer_id_same_among_child_spans() -> None:
service_name = "test-service"
tracing_service, tracer, provider = basic_open_telemetry_tracer(service_name)
TestTask().run("test-input", tracer)

# Service name is required for most backends,
# and although it's not necessary for console export,
# it's good to set service name anyways.
resource = Resource(attributes={SERVICE_NAME: "your-service-name"})
traces = get_traces_from_jaeger(tracing_service)

provider = TracerProvider(resource=resource)
processor = BatchSpanProcessor(ConsoleSpanExporter())
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
expectedTraceId = traces[0]["traceID"]

input = "input"
openTracer = get_tracer("intelligence-layer")
TestTask().run(input, CompositeTracer([OpenTelemetryTracer(openTracer)]))
provider.force_flush()
assert len(traces) == 1
spans = traces[0]["spans"]
assert len(spans) == 4
for span in spans:
assert span["traceID"] == expectedTraceId

task_span = spans[0]
root_spans = [span for span in spans if span["references"] == []]
assert len(root_spans) == 1
task_span = root_spans[0]
tags = task_span["tags"]
open_tel_input_tag = [tag for tag in tags if tag["key"] == "input"]
assert len(open_tel_input_tag) == 1
open_tel_output_tag = [tag for tag in tags if tag["key"] == "output"]
assert len(open_tel_output_tag) == 1



# def test_open_telemetry_tracer_task_automatically_logs_input_and_output(
# complete: Task[CompleteInput, CompleteOutput]
# ) -> None:
# service_name = "test_open_telemetry_tracer_task_automatically_logs_input_and_output_service"
# tracing_service, tracer, provider = basic_open_telemetry_tracer(service_name)
# input = CompleteInput(prompt=Prompt.from_text("test"))
# complete.run(input=input, tracer=tracer)

# traces = get_traces_from_jaeger(tracing_service)

# assert len(traces) == 1
# spans = traces[0]["spans"]
# assert len(spans) == 1
# task_span = spans[0]
# assert task_span["operationName"] == type(complete).__name__
# tags = task_span["tags"]
# open_tel_input_tag = [tag for tag in tags if tag["key"] == "input"]
# assert len(open_tel_input_tag) == 1
# open_tel_output_tag = [tag for tag in tags if tag["key"] == "output"]
# assert len(open_tel_output_tag) == 1







def get_traces_from_jaeger(tracing_service: str)-> Any:
request_timeout_in_seconds = 10
response = requests.get(tracing_service)
response_text = json.loads(response.text)
traces = response_text["data"]

request_start = time.time()
while len(traces) == 0 and time.time() - request_start < request_timeout_in_seconds:
response = requests.get(tracing_service)
response_text = json.loads(response.text)
traces = response_text["data"]
sleep(0.1)

return traces

0 comments on commit 32bc6ba

Please sign in to comment.