Skip to content

Commit

Permalink
feat(exporter): add GenericAdapter for span attribute conversion
Browse files Browse the repository at this point in the history
  • Loading branch information
teocns committed Nov 26, 2024
1 parent 5a75eda commit 39798ee
Showing 1 changed file with 106 additions and 92 deletions.
198 changes: 106 additions & 92 deletions agentops/session/exporter.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
from __future__ import annotations

import asyncio
import functools
import json
import threading
from abc import ABC
from datetime import datetime, timezone
from decimal import ROUND_HALF_UP, Decimal
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Protocol, Sequence, Union
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Protocol, Sequence, Union, cast
from uuid import UUID, uuid4
from weakref import WeakSet

Expand All @@ -15,6 +14,8 @@
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import ReadableSpan, TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter, SpanExporter, SpanExportResult
from opentelemetry.trace.span import Span
from opentelemetry.util.types import AttributeValue
from termcolor import colored

from agentops.config import Configuration
Expand All @@ -25,6 +26,64 @@
from agentops.http_client import HttpClient, Response
from agentops.log_config import logger

if TYPE_CHECKING:
from agentops.session import Session


class GenericAdapter:
"""Adapts any object to a dictionary of span attributes"""

@staticmethod
def to_span_attributes(obj: Any) -> Dict[str, AttributeValue]:
"""Convert object attributes to span attributes that are OTEL-compatible"""
# Get all public attributes
attrs = {k: v for k, v in obj.__dict__.items() if not k.startswith("_")}

# Construct span attributes with proper prefixes and type conversion
span_attrs: Dict[str, AttributeValue] = {}
for k, v in attrs.items():
if v is not None:
# Handle different types appropriately
if isinstance(v, (datetime, UUID)):
span_attrs[f"event.{k}"] = str(v)
elif isinstance(v, (str, int, float, bool)):
# These types are valid AttributeValues already
span_attrs[f"event.{k}"] = v
else:
# For complex objects, use safe serialization
span_attrs[f"event.{k}"] = safe_serialize(v)

# Add serialized data
span_attrs["event.data"] = safe_serialize(obj)
# Add session ID if available
if hasattr(obj, "session_id"):
span_attrs["session.id"] = str(obj.session_id)

return span_attrs

@staticmethod
def from_span_attributes(attrs: Dict[str, AttributeValue]) -> Dict[str, Any]:
"""Convert span attributes back to a dictionary of event attributes"""
event_attrs = {}

# Extract event-specific attributes
for key, value in attrs.items():
if key.startswith("event.") and key != "event.data":
# Remove the "event." prefix
clean_key = key.replace("event.", "", 1)
event_attrs[clean_key] = value

# Add parsed data if available
if "event.data" in attrs:
try:
data_str = str(attrs["event.data"])
data = json.loads(data_str)
event_attrs.update(data)
except (json.JSONDecodeError, TypeError):
pass

return event_attrs


class SessionProtocol(Protocol):
"""
Expand All @@ -36,10 +95,6 @@ class SessionProtocol(Protocol):
config: Configuration


if TYPE_CHECKING:
from agentops.session import Session


class SessionExporter(SpanExporter):
"""
Manages publishing events for Session
Expand All @@ -53,26 +108,6 @@ class SessionExporter(SpanExporter):
- According to the OpenTelemetry Python documentation, Resource should be initialized once per application and shared across all telemetry (traces, metrics, logs).
- Each Session gets its own Tracer (with session-specific context)
- Allow multiple sessions to share the provider while maintaining their own context
:: Resource
''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''
Captures information about the entity producing telemetry as Attributes.
For example, a process producing telemetry that is running in a container
on Kubernetes has a process name, a pod name, a namespace, and possibly
a deployment name. All these attributes can be included in the Resource.
''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''
The key insight from the documentation is:
- Resource represents the entity producing telemetry - in our case, that's the AgentOps SDK application itself
- Session-specific information should be attributes on the spans themselves
- A Resource is meant to identify the service/process/application1
- Sessions are units of work within that application
- The documentation example about "process name, pod name, namespace" refers to where the code is running, not the work it's doing
"""

def __init__(self, session: Session, **kwargs):
Expand All @@ -93,56 +128,47 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:

events = []
for span in spans:
# Safely get attributes with defaults
# Get span attributes and convert back to event data using adapter
attributes = span.attributes or {}
event_data = {}
try:
data_str = attributes.get("event.data", "{}")
if isinstance(data_str, str):
event_data = json.loads(data_str)
elif isinstance(data_str, dict):
event_data = data_str
except json.JSONDecodeError:
logger.error("Failed to parse event data JSON")
event_data = {}

# Safely get timestamps
event_attrs = GenericAdapter.from_span_attributes(attributes)

# Get current time as fallback
current_time = datetime.now(timezone.utc).isoformat()
init_timestamp = attributes.get("event.timestamp", current_time)
end_timestamp = attributes.get("event.end_timestamp", current_time)

# Safely get event ID
event_id = attributes.get("event.id")
if not event_id:
event_id = str(uuid4())
logger.warning("Event ID not found, generating new one but this shouldn't happen")
# Build event with required fields
event = {
"id": event_attrs.get("id", str(uuid4())),
"event_type": span.name,
"init_timestamp": event_attrs.get("timestamp", current_time),
"end_timestamp": event_attrs.get("end_timestamp", current_time),
"session_id": str(self.session.session_id),
}

# Format event data based on event type
# Add formatted data based on event type
if span.name == "actions":
formatted_data = {
"action_type": event_data.get("action_type", event_data.get("name", "unknown_action")),
"params": event_data.get("params", {}),
"returns": event_data.get("returns"),
}
event.update(
{
"action_type": event_attrs.get(
"action_type", event_attrs.get("name", "unknown_action")
),
"params": event_attrs.get("params", {}),
"returns": event_attrs.get("returns"),
}
)
elif span.name == "tools":
formatted_data = {
"name": event_data.get("name", event_data.get("tool_name", "unknown_tool")),
"params": event_data.get("params", {}),
"returns": event_data.get("returns"),
}
event.update(
{
"name": event_attrs.get("name", event_attrs.get("tool_name", "unknown_tool")),
"params": event_attrs.get("params", {}),
"returns": event_attrs.get("returns"),
}
)
else:
formatted_data = event_data

events.append(
{
"id": event_id,
"event_type": span.name,
"init_timestamp": init_timestamp,
"end_timestamp": end_timestamp,
**formatted_data,
"session_id": str(self.session.session_id),
}
)
# For other event types, include all data except what we already used
data = {k: v for k, v in event_attrs.items() if k not in ["id", "timestamp", "end_timestamp"]}
event.update(data)

events.append(event)

# Only make HTTP request if we have events and not shutdown
if events:
Expand All @@ -168,27 +194,25 @@ def shutdown(self) -> None:
self._shutdown.set()


class SessionExporterMixIn(SessionProtocol):
class SessionExporterMixIn(SessionProtocol, ABC):
"""Mixin class that provides OpenTelemetry exporting capabilities to Session"""

_exporter: SessionExporter
_tracer_provider: TracerProvider
_span_processor: BatchSpanProcessor
_tracer: trace.Tracer

def __init__(self):
"""Initialize OpenTelemetry components"""
self._span_processor = None
self._tracer_provider = None
self._exporter = None
self._shutdown = threading.Event()

# Initialize other attributes that might be accessed during cleanup
self._locks = getattr(self, "_locks", {})
self.is_running = getattr(self, "is_running", False)

# Initialize OTEL components
self._setup_otel()

def _setup_otel(self):
"""Set up OpenTelemetry components"""
# Create exporter
self._exporter = SessionExporter(self)
self._exporter = SessionExporter(self) # type: ignore

# Create and configure tracer provider
self._tracer_provider = TracerProvider(resource=Resource.create({SERVICE_NAME: "agentops"}))
Expand All @@ -215,17 +239,9 @@ def _record_otel_event(self, event: Union[Event, ErrorEvent], flush_now: bool =
name=str(event.event_type),
kind=trace.SpanKind.INTERNAL,
) as span:
# Set span attributes using safe_serialize for event data
span.set_attributes(
{
"event.id": str(event.id),
"event.type": str(event.event_type),
"event.timestamp": event.init_timestamp,
"event.end_timestamp": event.end_timestamp,
"event.data": safe_serialize(event),
"session.id": str(self.session_id),
}
)
# Use GenericAdapter to convert event to span attributes
span_attributes = GenericAdapter.to_span_attributes(event)
span.set_attributes(span_attributes)

finally:
detach(token)
Expand All @@ -241,5 +257,3 @@ def __del__(self):
self._span_processor.shutdown()
except Exception as e:
logger.warning(f"Error during span processor cleanup: {e}")

# ... rest of the class implementation ...

0 comments on commit 39798ee

Please sign in to comment.