Skip to content

Commit

Permalink
Rework OpenTelemetryTracer
Browse files Browse the repository at this point in the history
  • Loading branch information
NiklasKoehneckeAA committed May 23, 2024
1 parent 4bcf987 commit 65e50dc
Show file tree
Hide file tree
Showing 3 changed files with 195 additions and 115 deletions.
45 changes: 18 additions & 27 deletions src/intelligence_layer/core/tracer/open_telemetry_tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from opentelemetry.trace import set_span_in_context

from intelligence_layer.core.tracer.tracer import (
Context,
ExportedSpan,
LogEntry,
PydanticSerializable,
Span,
TaskSpan,
Expand All @@ -21,60 +21,56 @@ class OpenTelemetryTracer(Tracer):
"""A `Tracer` that uses open telemetry."""

def __init__(self, tracer: OpenTTracer) -> None:
self.O_tracer = tracer
self._tracer = tracer

def span(
self,
name: str,
timestamp: Optional[datetime] = None,
trace_id: Optional[str] = None,
) -> "OpenTelemetrySpan":
trace_id = self.ensure_id(trace_id)
tracer_span = self._tracer.start_span(
name,
attributes={"trace_id": trace_id},
start_time=None if not timestamp else _open_telemetry_timestamp(timestamp),
)
token = attach(set_span_in_context(tracer_span))
self._tracer
return OpenTelemetrySpan(tracer_span, self._tracer, token, trace_id)
return OpenTelemetrySpan(tracer_span, self._tracer, token, self.context)

def task_span(
self,
task_name: str,
input: PydanticSerializable,
timestamp: Optional[datetime] = None,
trace_id: Optional[str] = None,
) -> "OpenTelemetryTaskSpan":
trace_id = self.ensure_id(trace_id)

tracer_span = self._tracer.start_span(
task_name,
attributes={"input": _serialize(input), "trace_id": trace_id},
attributes={"input": _serialize(input)},
start_time=None if not timestamp else _open_telemetry_timestamp(timestamp),
)
token = attach(set_span_in_context(tracer_span))
return OpenTelemetryTaskSpan(tracer_span, self._tracer, token, trace_id)
return OpenTelemetryTaskSpan(tracer_span, self._tracer, token, self.context)

def export_for_viewing(self) -> Sequence[ExportedSpan]:
raise NotImplementedError("The OpenTelemetryTracer does not support export for viewing, as it can not acces its own traces.")
raise NotImplementedError(
"The OpenTelemetryTracer does not support export for viewing, as it can not access its own traces."
)


class OpenTelemetrySpan(Span, OpenTelemetryTracer):
"""A `Span` created by `OpenTelemetryTracer.span`."""

end_timestamp: Optional[datetime] = None

def id(self) -> str:
return self._trace_id

def __init__(
self, span: OpenTSpan, tracer: OpenTTracer, token: object, trace_id: str
self,
span: OpenTSpan,
tracer: OpenTTracer,
token: object,
context: Optional[Context] = None,
) -> None:
super().__init__(tracer)
OpenTelemetryTracer.__init__(self, tracer)
Span.__init__(self, context=context)
self.open_ts_span = span
self._token = token
self._trace_id = trace_id

def log(
self,
Expand All @@ -84,28 +80,23 @@ def log(
) -> None:
self.open_ts_span.add_event(
message,
{"value": _serialize(value), "trace_id": self.id()},
{"value": _serialize(value)},
None if not timestamp else _open_telemetry_timestamp(timestamp),
)

def end(self, timestamp: Optional[datetime] = None) -> None:
super().end(timestamp)
detach(self._token)
self.open_ts_span.end(
_open_telemetry_timestamp(timestamp) if timestamp is not None else None
)
super().end(timestamp)


class OpenTelemetryTaskSpan(TaskSpan, OpenTelemetrySpan):
"""A `TaskSpan` created by `OpenTelemetryTracer.task_span`."""

output: Optional[PydanticSerializable] = None

def __init__(
self, span: OpenTSpan, tracer: OpenTTracer, token: object, trace_id: str
) -> None:
super().__init__(span, tracer, token, trace_id)

def record_output(self, output: PydanticSerializable) -> None:
self.open_ts_span.set_attribute("output", _serialize(output))

Expand Down
57 changes: 57 additions & 0 deletions tests/core/test_temp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import contextlib
import json
import os
import time
from pathlib import Path
from typing import Any, Iterator, Optional
from unittest.mock import Mock

import pytest
import requests
from aleph_alpha_client import Prompt
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from pytest import fixture

from intelligence_layer.core import (
CompleteInput,
CompleteOutput,
CompositeTracer,
FileTracer,
InMemorySpan,
InMemoryTaskSpan,
InMemoryTracer,
LogEntry,
LuminousControlModel,
OpenTelemetryTracer,
Task,
TaskSpan,
utc_now,
)
from intelligence_layer.core.tracer.persistent_tracer import TracerLogEntryFailed
from intelligence_layer.core.tracer.tracer import ErrorValue

@fixture
def open_telemetry_tracer() -> tuple[str, OpenTelemetryTracer]:
service_name = "test-service"
url = "http://localhost:16686/api/traces?service=" + service_name
resource = Resource.create({SERVICE_NAME: service_name})
provider = TracerProvider(resource=resource)
trace.set_tracer_provider(provider)
processor = BatchSpanProcessor(OTLPSpanExporter())
provider.add_span_processor(processor)
openTracer = OpenTelemetryTracer(trace.get_tracer("intelligence-layer"))
return (url, openTracer)


def test_temp_1(open_telemetry_tracer):
print("test_1")

def test_temp_2(open_telemetry_tracer):
print("test_2")

def test_temp_3(open_telemetry_tracer):
print("test_3")
Loading

0 comments on commit 65e50dc

Please sign in to comment.