Skip to content

Commit

Permalink
feat(telemetry): add error event handling in processor
Browse files Browse the repository at this point in the history
  • Loading branch information
teocns committed Jan 7, 2025
1 parent 60a4f61 commit e12231a
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 12 deletions.
22 changes: 14 additions & 8 deletions agentops/telemetry/processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from agentops.helpers import filter_unjsonable, get_ISO_time
from agentops.telemetry.converter import EventToSpanConverter
from agentops.event import ErrorEvent


class EventProcessor:
Expand Down Expand Up @@ -63,18 +64,28 @@ def process_event(self, event: Any, tags: Optional[List[str]] = None, flush_now:
if not hasattr(event, "end_timestamp") or event.end_timestamp is None:
event.end_timestamp = get_ISO_time()
if not hasattr(event, "session_id"):
event.session_id = self.session_id # Ensure session_id is set
event.session_id = self.session_id

# Get current span if it exists
current_span = trace.get_current_span()

# Create session context
token = set_value("session.id", str(self.session_id))
try:
token = attach(token)

# Get span definitions from converter
span_definitions = EventToSpanConverter.convert_event(event)
primary_span = None

# Create spans based on definitions
# If we have a current span and this is an error event, update the current span
if isinstance(event, ErrorEvent) and current_span and current_span.is_recording():
# Update current span with error attributes
for key, value in span_definitions[0].attributes.items():
current_span.set_attribute(key, value)
return current_span

# Otherwise create new spans as before
primary_span = None
for span_def in span_definitions:
context = None
if span_def.parent_span_id and primary_span:
Expand All @@ -87,10 +98,6 @@ def process_event(self, event: Any, tags: Optional[List[str]] = None, flush_now:
"session.tags": ",".join(tags) if tags else "",
"event.timestamp": event.init_timestamp,
"event.end_timestamp": event.end_timestamp,
"event.data": json.dumps({
"session_id": str(self.session_id), # Include in event data
**self._format_event_data(event)
})
})

with self._tracer.start_span(
Expand All @@ -101,7 +108,6 @@ def process_event(self, event: Any, tags: Optional[List[str]] = None, flush_now:
) as span:
if not primary_span:
primary_span = span
# Update event counts for primary span
if event.event_type in self.event_counts:
self.event_counts[event.event_type] += 1

Expand Down
8 changes: 4 additions & 4 deletions tests/telemetry/test_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,16 @@ def test_process_llm_event(self, processor, mock_llm_event, test_exporter):

def test_process_error_event(self, processor, mock_error_event, test_exporter):
"""Test processing an error event creates correct span"""
with processor._tracer.start_as_current_span("error") as span: # Changed from "errors"
# Set error status while span is active
# This creates span #1
with processor._tracer.start_as_current_span("error") as span:
span.set_status(Status(StatusCode.ERROR))

# Process the event
# This creates span #2
processor.process_event(mock_error_event)

processor._tracer_provider.force_flush()

# Verify exported span
# Test expects only 1 span
assert len(test_exporter.exported_spans) == 1
error_span = test_exporter.exported_spans[0]

Expand Down

0 comments on commit e12231a

Please sign in to comment.