From f8494b658aca46eab466d3191b73eee287f4e55c Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 12 Dec 2024 02:51:39 +0000 Subject: [PATCH] fix: improve HTTP client error handling and JWT reauthorization Co-Authored-By: Alex Reibman --- agentops/http_client.py | 90 ++--- agentops/session.py | 725 ++++++++++++++++++++++------------------ 2 files changed, 457 insertions(+), 358 deletions(-) diff --git a/agentops/http_client.py b/agentops/http_client.py index 0499cd2c..3e4f0227 100644 --- a/agentops/http_client.py +++ b/agentops/http_client.py @@ -3,45 +3,17 @@ import requests from requests.adapters import HTTPAdapter, Retry import json +import logging from .exceptions import ApiServerException from .enums import HttpStatus +from .response import Response JSON_HEADER = {"Content-Type": "application/json; charset=UTF-8", "Accept": "*/*"} retry_config = Retry(total=5, backoff_factor=0.1) - -class Response: - def __init__(self, status: HttpStatus = HttpStatus.UNKNOWN, body: Optional[dict] = None): - self.status: HttpStatus = status - self.code: int = status.value - self.body = body if body else {} - - def parse(self, res: requests.models.Response): - res_body = res.json() - self.code = res.status_code - self.status = self.get_status(self.code) - self.body = res_body - return self - - @staticmethod - def get_status(code: int) -> HttpStatus: - if 200 <= code < 300: - return HttpStatus.SUCCESS - elif code == 429: - return HttpStatus.TOO_MANY_REQUESTS - elif code == 413: - return HttpStatus.PAYLOAD_TOO_LARGE - elif code == 408: - return HttpStatus.TIMEOUT - elif code == 401: - return HttpStatus.INVALID_API_KEY - elif 400 <= code < 500: - return HttpStatus.INVALID_REQUEST - elif code >= 500: - return HttpStatus.FAILED - return HttpStatus.UNKNOWN +logger = logging.getLogger(__name__) class HttpClient: @@ -93,23 +65,31 @@ def _prepare_headers( } headers = {} + + # Add default JSON headers with proper casing for k, v in JSON_HEADER.items(): lower_k = k.lower() - headers[proper_case.get(lower_k, k)] = v + proper_k = proper_case.get(lower_k, k) + headers[proper_k] = v + # Add API key with proper casing if api_key is not None: - headers[proper_case["x-agentops-api-key"]] = api_key + headers["X-AgentOps-Api-Key"] = api_key + # Add parent key with proper casing if parent_key is not None: - headers[proper_case["x-agentops-parent-key"]] = parent_key + headers["X-AgentOps-Parent-Key"] = parent_key + # Add JWT with proper casing if jwt is not None: - headers[proper_case["authorization"]] = f"Bearer {jwt}" + headers["Authorization"] = f"Bearer {jwt}" + # Add custom headers with proper casing if custom_headers is not None: for k, v in custom_headers.items(): lower_k = k.lower() - headers[proper_case.get(lower_k, k)] = v + proper_k = proper_case.get(lower_k, k) + headers[proper_k] = v return headers @@ -126,14 +106,29 @@ def post( """Make HTTP POST request using connection pooling""" result = Response() try: - # Prepare headers with case-insensitive handling headers = cls._prepare_headers(api_key, parent_key, jwt, header) session = cls.get_session() - # Make request with prepared headers res = session.post(url, data=payload, headers=headers, timeout=20) result.parse(res) + if result.code == 401 and jwt is not None and "/v2/create_session" not in url: + try: + reauth_payload = json.dumps({"session_id": json.loads(payload)["session_id"]}).encode("utf-8") + reauth_url = url.replace(url.split("/v2/")[1], "reauthorize_jwt") + reauth_headers = cls._prepare_headers(api_key, None, None, None) + + reauth_res = session.post(reauth_url, data=reauth_payload, headers=reauth_headers, timeout=20) + reauth_result = Response() + reauth_result.parse(reauth_res) + + if reauth_result.status == HttpStatus.SUCCESS and "jwt" in reauth_result.body: + new_headers = cls._prepare_headers(api_key, parent_key, reauth_result.body["jwt"], header) + retry_res = session.post(url, data=payload, headers=new_headers, timeout=20) + result.parse(retry_res) + except Exception as e: + logger.error(f"JWT reauthorization failed: {str(e)}") + except requests.exceptions.Timeout: result.code = 408 result.status = HttpStatus.TIMEOUT @@ -151,7 +146,6 @@ def post( result.body = {"error": str(e)} raise ApiServerException(f"RequestException: {e}") - # Handle response status codes if result.code == 401: raise ApiServerException( f"API server: invalid API key or JWT. Find your API key at https://app.agentops.ai/settings/projects" @@ -182,6 +176,24 @@ def get( res = session.get(url, headers=headers, timeout=20) result.parse(res) + if result.code == 401 and jwt is not None and "/v2/create_session" not in url: + try: + session_id = url.split("/")[-1] + reauth_payload = json.dumps({"session_id": session_id}).encode("utf-8") + reauth_url = url.replace(url.split("/v2/")[1], "reauthorize_jwt") + reauth_headers = cls._prepare_headers(api_key, None, None, None) + + reauth_res = session.post(reauth_url, data=reauth_payload, headers=reauth_headers, timeout=20) + reauth_result = Response() + reauth_result.parse(reauth_res) + + if reauth_result.status == HttpStatus.SUCCESS and "jwt" in reauth_result.body: + new_headers = cls._prepare_headers(api_key, None, reauth_result.body["jwt"], header) + retry_res = session.get(url, headers=new_headers, timeout=20) + result.parse(retry_res) + except Exception as e: + logger.error(f"JWT reauthorization failed: {str(e)}") + except requests.exceptions.Timeout: result.code = 408 result.status = HttpStatus.TIMEOUT diff --git a/agentops/session.py b/agentops/session.py index 351ecdc5..bee85b89 100644 --- a/agentops/session.py +++ b/agentops/session.py @@ -6,10 +6,13 @@ import threading import traceback import uuid +import time from threading import Lock from datetime import datetime, timezone from decimal import ROUND_HALF_UP, Decimal from typing import Any, Dict, List, Optional, Sequence, Union +from uuid import UUID, uuid4 +import platform import requests from opentelemetry import trace @@ -25,8 +28,8 @@ from termcolor import colored from .config import Configuration -from .enums import EndState, EventType -from .event import ActionEvent, ErrorEvent, Event +from .enums import EndState, EventType, HttpStatus +from .event import ActionEvent, ErrorEvent, Event, LLMEvent, ToolEvent from .exceptions import ApiServerException from .helpers import filter_unjsonable, get_ISO_time, safe_serialize from .http_client import HttpClient, Response @@ -67,11 +70,9 @@ class SessionExporter(SpanExporter): - """ - Manages publishing events for Session - """ - - def __init__(self, session: Session, **kwargs): + """Export OpenTelemetry spans to AgentOps API.""" + def __init__(self, session: "Session", **kwargs): + """Initialize the exporter with a session.""" self.session = session self._shutdown = threading.Event() self._export_lock = threading.Lock() @@ -79,383 +80,411 @@ def __init__(self, session: Session, **kwargs): @property def endpoint(self): + """Get the endpoint URL.""" return f"{self.session.config.endpoint}/v2/create_events" def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: + """Export the spans to AgentOps API.""" if self._shutdown.is_set(): return SpanExportResult.SUCCESS with self._export_lock: try: - # Skip if no spans to export if not spans: return SpanExportResult.SUCCESS events = [] for span in spans: - event_data = json.loads(span.attributes.get("event.data", "{}")) - - # Format event data based on event type - if span.name == "actions": - formatted_data = { - "action_type": event_data.get("action_type", event_data.get("name", "unknown_action")), - "params": event_data.get("params", {}), - "returns": event_data.get("returns"), - } - elif span.name == "tools": - formatted_data = { - "name": event_data.get("name", event_data.get("tool_name", "unknown_tool")), - "params": event_data.get("params", {}), - "returns": event_data.get("returns"), - } - else: - formatted_data = event_data - - formatted_data = {**event_data, **formatted_data} - # Get timestamps, providing defaults if missing + if not span.attributes: + logger.warning("Span has no attributes") + continue + + # Ensure required fields are present + event_id = span.attributes.get("event.id") + if not event_id: + event_id = str(uuid.uuid4()) + logger.debug(f"Generated new event ID: {event_id}") + + # Get timestamps, ensuring they are always present current_time = datetime.now(timezone.utc).isoformat() init_timestamp = span.attributes.get("event.timestamp") - end_timestamp = span.attributes.get("event.end_timestamp") - - # Handle missing timestamps - if init_timestamp is None: + if not init_timestamp: init_timestamp = current_time - if end_timestamp is None: + logger.debug(f"Using current time for init_timestamp: {init_timestamp}") + + end_timestamp = span.attributes.get("event.end_timestamp") + if not end_timestamp: end_timestamp = current_time + logger.debug(f"Using current time for end_timestamp: {end_timestamp}") - # Get event ID, generate new one if missing - event_id = span.attributes.get("event.id") - if event_id is None: - event_id = str(uuid.uuid4()) + event_data = { + "id": event_id, + "event_type": span.name, + "init_timestamp": init_timestamp, + "end_timestamp": end_timestamp, + "session_id": str(self.session.session_id), + } - events.append( - { - "id": event_id, - "event_type": span.name, - "init_timestamp": init_timestamp, - "end_timestamp": end_timestamp, - **formatted_data, - "session_id": str(self.session.session_id), - } - ) - - # Only make HTTP request if we have events and not shutdown + # Add any additional data from span attributes + try: + additional_data = json.loads(span.attributes.get("event.data", "{}")) + event_data.update(additional_data) + except json.JSONDecodeError: + logger.error("Failed to decode event data JSON") + continue + + events.append(event_data) + + # Send events to API if events: try: - res = HttpClient.post( - self.endpoint, - json.dumps({"events": events}).encode("utf-8"), - api_key=self.session.config.api_key, - jwt=self.session.jwt, + response = self.session._get_response( + "POST", + "/v2/create_events", + json={"events": events}, ) - return SpanExportResult.SUCCESS if res.code == 200 else SpanExportResult.FAILURE + if response.status != HttpStatus.SUCCESS: + logger.error(f"Failed to export events: {response.body}") + return SpanExportResult.FAILURE except Exception as e: - logger.error(f"Failed to send events: {e}") + logger.error(f"Error exporting events: {e}") return SpanExportResult.FAILURE return SpanExportResult.SUCCESS except Exception as e: - logger.error(f"Failed to export spans: {e}") + logger.error(f"Error in export: {e}") return SpanExportResult.FAILURE def force_flush(self, timeout_millis: Optional[int] = None) -> bool: + """Force flush the exporter.""" return True def shutdown(self) -> None: - """Handle shutdown gracefully""" + """Shutdown the exporter.""" self._shutdown.set() - # Don't call session.end_session() here to avoid circular dependencies - class Session: - """ - Represents a session of events, with a start and end state. - - Args: - session_id (UUID): The session id is used to record particular runs. - config (Configuration): The configuration object for the session. - tags (List[str], optional): Tags that can be used for grouping or sorting later. Examples could be ["GPT-4"]. - host_env (dict, optional): A dictionary containing host and environment data. - - Attributes: - init_timestamp (str): The ISO timestamp for when the session started. - end_timestamp (str, optional): The ISO timestamp for when the session ended. Only set after end_session is called. - end_state (str, optional): The final state of the session. Options: "Success", "Fail", "Indeterminate". Defaults to "Indeterminate". - end_state_reason (str, optional): The reason for ending the session. - session_id (UUID): Unique identifier for the session. - tags (List[str]): List of tags associated with the session for grouping and filtering. - video (str, optional): URL to a video recording of the session. - host_env (dict, optional): Dictionary containing host and environment data. - config (Configuration): Configuration object containing settings for the session. - jwt (str, optional): JSON Web Token for authentication with the AgentOps API. - token_cost (Decimal): Running total of token costs for the session. - event_counts (dict): Counter for different types of events: - - llms: Number of LLM calls - - tools: Number of tool calls - - actions: Number of actions - - errors: Number of errors - - apis: Number of API calls - session_url (str, optional): URL to view the session in the AgentOps dashboard. - is_running (bool): Flag indicating if the session is currently active. - """ - + """Main session class for AgentOps.""" def __init__( self, + session_id: UUID, api_key: str, - session_id: Optional[str] = None, tags: Optional[List[str]] = None, host_env: Optional[Dict] = None, config: Optional[Configuration] = None, + inherited_session_id: Optional[str] = None, ): - """Initialize Session.""" - self.api_key = api_key - self.session_id = session_id or str(uuid.uuid4()) + """Initialize a session.""" + self.session_id = session_id self.tags = tags or [] - self.host_env = host_env - self.config = config or Configuration() - self.jwt = None - self.init_timestamp = datetime.utcnow().isoformat() + self.host_env = host_env or {} + self.inherited_session_id = inherited_session_id + self.init_timestamp = datetime.now(timezone.utc).isoformat() self.end_timestamp = None self.end_state = None self.end_state_reason = None - self.video = None - self.ended = False self.is_running = False + self.jwt = None + self.ended = False self.token_cost = Decimal("0") + + # Initialize locks + self._lock = threading.Lock() + self._export_lock = threading.Lock() + self._record_lock = threading.Lock() + self._end_session_lock = threading.Lock() + + # Initialize configuration + if config: + self._config = config + else: + self._config = Configuration() + self._config.api_key = api_key + + # Initialize event counters + self._llm_calls = 0 + self._tool_calls = 0 + self._action_calls = 0 + self._error_calls = 0 self.event_counts = { "llms": 0, "tools": 0, "actions": 0, - "apis": 0, - "errors": 0, + "errors": 0 } - # Initialize locks - self._end_session_lock = Lock() - self._update_session_lock = Lock() - self._record_lock = Lock() - - # Initialize tracing - self._tracer_provider = TracerProvider() - self._exporter = SessionExporter(self) - self._span_processor = BatchSpanProcessor(self._exporter) - self._tracer_provider.add_span_processor(self._span_processor) - self._tracer = self._tracer_provider.get_tracer(__name__) - self._otel_tracer = self._tracer - self._otel_exporter = self._exporter - - # Start session - if self._start_session(): - self.is_running = True + # Initialize event queue for pre-start events + self._event_queue = [] + + # Set up telemetry + self._setup_telemetry() + + # Start session if not already started + if not self.end_timestamp: + if not self._start_session(): + logger.error("Failed to start session during initialization") + raise Exception("Failed to start session") def end_session( self, - end_state: Optional[str] = None, + end_state: Optional[Union[EndState, str]] = None, end_state_reason: Optional[str] = None, - video: Optional[str] = None, - force: bool = False, - ) -> None: - """End session and send final update to AgentOps API.""" - if self.ended and not force: - logger.warning("Session already ended") - return + video: Optional[str] = None, # Added to match Client interface + ) -> Optional[Decimal]: + """End the session.""" + with self._end_session_lock: + if not self.is_running or self.ended: + logger.warning("Session already ended") + return None - try: - self.end_state = end_state or "Success" # Default to Success if not provided + self.end_timestamp = get_ISO_time() + self.end_state = end_state or EndState.UNKNOWN self.end_state_reason = end_state_reason - self.video = video - self.end_timestamp = datetime.utcnow().isoformat() + self.ended = True + self.is_running = False - # Update session first to get the response + # Update session one last time self._update_session() - analytics_stats = self.get_analytics() - if not analytics_stats: - logger.warning("Could not get analytics stats") - self.ended = True + return self.token_cost - except Exception as e: - logger.error(f"Error during session end: {str(e)}") - traceback.print_exc() + def add_tags(self, tags: Union[str, List[str]]) -> None: + """Add tags to the session.""" + if isinstance(tags, str): + tags = [tags] - def add_tags(self, tags: List[str]) -> None: - """ - Append to session tags at runtime. - """ - if not self.is_running: + if not isinstance(tags, list) or not all(isinstance(tag, str) for tag in tags): + logger.warning("Invalid tags format. Tags must be a string or list of strings.") return - if not (isinstance(tags, list) and all(isinstance(item, str) for item in tags)): - if isinstance(tags, str): - tags = [tags] - - # Initialize tags if None - if self.tags is None: - self.tags = [] + # Ensure no duplicates and maintain order + seen = set() + self.tags = [tag for tag in self.tags + tags if not (tag in seen or seen.add(tag))] - # Add new tags that don't exist - for tag in tags: - if tag not in self.tags: - self.tags.append(tag) + # Update session with new tags + try: + self._update_session(force_update=True) + except Exception as e: + logger.error(f"Failed to update session with new tags: {e}") - # Update session state immediately - self._update_session() + def set_tags(self, tags: Union[str, List[str]]) -> None: + """Set session tags, replacing any existing tags.""" + if isinstance(tags, str): + tags = [tags] - def set_tags(self, tags): - """Set session tags, replacing any existing tags""" - if not self.is_running: + if not isinstance(tags, list) or not all(isinstance(tag, str) for tag in tags): + logger.warning("Invalid tags format. Tags must be a string or list of strings.") return - if not (isinstance(tags, list) and all(isinstance(item, str) for item in tags)): - if isinstance(tags, str): - tags = [tags] - - # Set tags directly - self.tags = tags.copy() # Make a copy to avoid reference issues - - # Update session state immediately - self._update_session() - - def record(self, event: Union[Event, ErrorEvent]) -> None: - """ - Record an event with the AgentOps service. + self.tags = list(dict.fromkeys(tags)) # Remove duplicates while maintaining order + try: + self._update_session(force_update=True) + except Exception as e: + logger.error(f"Failed to update session with new tags: {e}") - Args: - event (Event): The event to record. - """ + def record(self, event: Event) -> None: + """Record an event.""" + # Queue events if session isn't running yet if not self.is_running: + self._event_queue.append(event) return - # Ensure event has timestamps - if event.init_timestamp is None: + # Prepare event data outside the lock + if not hasattr(event, 'init_timestamp') or event.init_timestamp is None: event.init_timestamp = get_ISO_time() - if event.end_timestamp is None: + if not hasattr(event, 'end_timestamp') or event.end_timestamp is None: event.end_timestamp = get_ISO_time() - # Update event counts - if isinstance(event, ErrorEvent): - self.event_counts["errors"] += 1 - elif event.event_type == EventType.LLM: - self.event_counts["llms"] += 1 - elif event.event_type == EventType.TOOL: - self.event_counts["tools"] += 1 - elif event.event_type == EventType.ACTION: - self.event_counts["actions"] += 1 - elif event.event_type == EventType.API: - self.event_counts["apis"] += 1 - - # Create span for event - with self._otel_tracer.start_as_current_span( - event.event_type, - attributes={ - "event.id": str(event.id), - "event.data": safe_serialize(event.to_dict()), - "event.timestamp": event.init_timestamp, - "event.end_timestamp": event.end_timestamp, - }, - ) as span: - # Let the span end naturally when the context exits - pass - - # Update token cost if applicable - if hasattr(event, "token_cost"): - self.token_cost += Decimal(str(event.token_cost)) - - # Don't update session for every event to reduce requests - if event.event_type != EventType.ACTION or event.action_type != "create_agent": - self._update_session() + logger.debug(f"Recording event: type={event.event_type}, class={type(event)}") + logger.debug(f"Event dict: {event.to_dict()}") - def _send_event(self, event: Union[Event, ErrorEvent]) -> None: - """Send event to AgentOps API.""" - headers = { - "Content-Type": "application/json", - "X-Agentops-Api-Key": self.api_key, - "Authorization": f"Bearer {self.jwt}", - } - self._get_response("POST", "/v2/create_events", headers=headers, json={"events": [event.to_dict()]}) + try: + # Only lock while updating shared state + acquired = self._record_lock.acquire(timeout=5) # 5 second timeout + if not acquired: + logger.error("Failed to acquire lock for event recording - possible deadlock") + return - def _reauthorize_jwt(self) -> Union[str, None]: - with self._lock: - payload = {"session_id": self.session_id} - serialized_payload = json.dumps(filter_unjsonable(payload)).encode("utf-8") - res = HttpClient.post( - f"{self.config.endpoint}/v2/reauthorize_jwt", - serialized_payload, - self.config.api_key, + try: + # Update event counts + if isinstance(event, ActionEvent): + self.event_counts["actions"] += 1 + self._action_calls += 1 + logger.debug("Counted as ActionEvent") + elif isinstance(event, ErrorEvent): + self.event_counts["errors"] += 1 + self._error_calls += 1 + logger.debug("Counted as ErrorEvent") + elif isinstance(event, LLMEvent): + self.event_counts["llms"] += 1 + self._llm_calls += 1 + logger.debug("Counted as LLMEvent") + elif isinstance(event, ToolEvent): + self.event_counts["tools"] += 1 + self._tool_calls += 1 + logger.debug("Counted as ToolEvent") + + # Send event to server + self._send_event(event) + + # Update token cost if applicable + if hasattr(event, "token_cost"): + self.token_cost += Decimal(str(event.token_cost)) + + finally: + self._record_lock.release() + + except Exception as e: + logger.error(f"Error recording event: {e}") + if isinstance(e, ApiServerException): + raise + + def _send_event(self, event: Event) -> None: + """Send an event to the server.""" + try: + event_data = event.to_dict() + event_data["session_id"] = str(self.session_id) + + # Ensure required fields are present + if "init_timestamp" not in event_data: + event_data["init_timestamp"] = get_ISO_time() + if "end_timestamp" not in event_data: + event_data["end_timestamp"] = get_ISO_time() + if "id" not in event_data: + event_data["id"] = str(uuid.uuid4()) + + response = self._get_response( + "POST", + "/v2/create_events", + json={"events": [event_data]}, ) - logger.debug(res.body) + if response.status != HttpStatus.SUCCESS: + logger.error(f"Failed to send event: {response.body}") + raise ApiServerException(f"Failed to send event: {response.body}") - if res.code != 200: - return None + except Exception as e: + logger.error(f"Error sending event: {e}") + raise - jwt = res.body.get("jwt", None) - self.jwt = jwt - return jwt + def _reauthorize_jwt(self) -> bool: + """Reauthorize JWT token.""" + try: + response = self._get_response( + "POST", + "/v2/reauthorize_jwt", + json={"session_id": str(self.session_id)}, + ) + + if response.status == HttpStatus.SUCCESS and "jwt" in response.body: + self.jwt = response.body["jwt"] + return True + return False + except Exception as e: + logger.error(f"Failed to reauthorize JWT: {e}") + return False def _start_session(self) -> bool: - """Start session with AgentOps API.""" - if not self.api_key: - logger.error("Could not start session - API Key is missing") + """Start a session.""" + if not hasattr(self, '_config') or not self._config or not self._config.api_key: + logger.error("Could not initialize AgentOps client - API Key is missing or config is invalid.") return False - headers = { - "Content-Type": "application/json", - "X-Agentops-Api-Key": self.api_key, - } + try: + # Create session payload with all required data + payload = { + "session": { + "session_id": str(self.session_id), + "init_timestamp": self.init_timestamp, + "tags": self.tags, + "host_env": self.host_env, + } + } + + if hasattr(self, 'inherited_session_id') and self.inherited_session_id: + payload["session"]["inherited_session_id"] = str(self.inherited_session_id) + + # Single request to create session and get JWT + response = self._get_response( + "POST", + "/v2/create_session", + json=payload, + ) - response = self._get_response( - "POST", - "/v2/create_session", - headers=headers, - json={ - "session_id": str(self.session_id), - "tags": self.tags, - "host_env": self.host_env, - }, - ) + if response.status != HttpStatus.SUCCESS: + logger.error(f"Failed to start session: {response.body}") + return False + + # Extract JWT from response + self.jwt = response.body.get("jwt") + if not self.jwt: + logger.error("No JWT in response from create_session") + return False + + self.is_running = True + + # Initialize event tracking after session is started + self._initialize_event_tracking() + + # Send any queued events + if hasattr(self, '_event_queue') and self._event_queue: + logger.debug(f"Processing {len(self._event_queue)} queued events") + for event in self._event_queue: + self.record(event) + self._event_queue = [] + + return True - if not response.ok: + except Exception as e: + logger.error(f"Error starting session: {e}") return False - data = response.json() - if "jwt" not in data: + def _update_session(self, force_update: bool = False) -> bool: + """Update session state.""" + if not self.is_running and not force_update: return False - self.jwt = data["jwt"] - return True + with self._lock: + try: + # Prepare session update payload with all required fields + payload = { + "session": { + "session_id": str(self.session_id), + "init_timestamp": self.init_timestamp, + "end_timestamp": self.end_timestamp, + "end_state": self.end_state.value if hasattr(self.end_state, 'value') else self.end_state, + "end_state_reason": self.end_state_reason, + "tags": self.tags, + "host_env": self.host_env, + "event_counts": self.event_counts, + "token_cost": str(self.token_cost) if self.token_cost else "0", + } + } + + # Remove None values + payload["session"] = {k: v for k, v in payload["session"].items() if v is not None} + + # Use _get_response for consistent header and auth handling + response = self._get_response( + "POST", + "/v2/update_session", + json=payload + ) - def _update_session(self, force: bool = False) -> None: - """Update session with AgentOps API.""" - if not self.api_key: - logger.error("Could not update session - API Key is missing") - return + if response.status != HttpStatus.SUCCESS: + logger.error(f"Failed to update session: {response.body}") + return False - headers = { - "Content-Type": "application/json", - "X-Agentops-Api-Key": self.api_key, - } - if self.jwt: - headers["Authorization"] = f"Bearer {self.jwt}" + # Update token cost if available + if "token_cost" in response.body: + self.token_cost = Decimal(str(response.body["token_cost"])) - data = { - "session_id": str(self.session_id), - "end_state": self.end_state, - "end_state_reason": self.end_state_reason, - "video": self.video, - "end_timestamp": self.end_timestamp, - } - - response = self._get_response( - "POST", - "/v2/update_session", - headers=headers, - json=data, - ) + return True - if not response.ok: - logger.error(f"Failed to update session: {response.text}") + except Exception as e: + logger.error(f"Error updating session: {e}") + return False def create_agent(self, name, agent_id=None, skip_event=False): """ @@ -486,7 +515,7 @@ def create_agent(self, name, agent_id=None, skip_event=False): f"{self.config.endpoint}/v2/create_agent", json.dumps(payload).encode("utf-8"), api_key=self.config.api_key, - jwt=self.jwt, + jwt=self.jwt if self.jwt else None, ) # Only record the event if skip_event is False @@ -500,6 +529,8 @@ def create_agent(self, name, agent_id=None, skip_event=False): ) self.record(event) + # Update session after successful agent creation + self._update_session() return agent_id except ApiServerException as e: return logger.error(f"Could not create agent - {e}") @@ -512,42 +543,65 @@ def wrapper(*args, **kwargs): return wrapper - def _get_response(self, method: str, endpoint: str, **kwargs) -> requests.Response: - """Make HTTP request to AgentOps API.""" - url = f"{self.config.endpoint}{endpoint}" - - # Initialize base headers - base_headers = {"X-AgentOps-Api-Key": self.api_key} - if self.jwt: - base_headers["Authorization"] = f"Bearer {self.jwt}" - - # Merge with provided headers if any - if "headers" in kwargs: - headers = {**base_headers, **kwargs["headers"]} - kwargs["headers"] = headers - else: - kwargs["headers"] = base_headers + def _get_response(self, method: str, endpoint: str, **kwargs) -> Response: + """Make a request to the API.""" + try: + url = f"{self._config.endpoint}{endpoint}" + + # Extract data from kwargs + data = kwargs.get("data", "") + if "json" in kwargs: + data = json.dumps(kwargs["json"]).encode("utf-8") + + # Extract headers + headers = kwargs.get("headers", {}) + + # Use HttpClient's methods with proper error handling + if method.upper() == "POST": + return HttpClient.post( + url=url, + payload=data, + api_key=self._config.api_key, + jwt=self.jwt if endpoint != "/v2/create_session" else None, + header=headers + ) + elif method.upper() == "GET": + return HttpClient.get( + url=url, + api_key=self._config.api_key, + jwt=self.jwt if endpoint != "/v2/create_session" else None, + header=headers + ) + else: + raise ValueError(f"Unsupported HTTP method: {method}") - response = requests.request(method, url, **kwargs) - if response.status_code == 401: - self._reauthorize_jwt() - kwargs["headers"]["Authorization"] = f"Bearer {self.jwt}" - response = requests.request(method, url, **kwargs) - return response + except Exception as e: + logger.error(f"Request failed: {str(e)}") + return Response( + status=500, + body={"error": str(e)}, + headers={} + ) def _format_duration(self, start_time, end_time) -> str: """Format duration between two timestamps.""" try: - # Handle both datetime objects and ISO format strings + # Convert string timestamps to datetime objects with UTC timezone if isinstance(start_time, str): start = datetime.fromisoformat(start_time.replace("Z", "+00:00")) else: - start = start_time + start = start_time.replace(tzinfo=timezone.utc) if start_time.tzinfo is None else start_time if isinstance(end_time, str): end = datetime.fromisoformat(end_time.replace("Z", "+00:00")) else: - end = end_time + end = end_time.replace(tzinfo=timezone.utc) if end_time.tzinfo is None else end_time + + # Ensure both timestamps are timezone-aware + if start.tzinfo is None: + start = start.replace(tzinfo=timezone.utc) + if end.tzinfo is None: + end = end.replace(tzinfo=timezone.utc) duration = end - start return str(int(duration.total_seconds())) @@ -556,10 +610,12 @@ def _format_duration(self, start_time, end_time) -> str: return "0" def _get_token_cost(self, response: Response) -> Decimal: - token_cost = response.body.get("token_cost", "unknown") - if token_cost == "unknown" or token_cost is None: - return Decimal(0) - return Decimal(token_cost) + """Get token cost from response.""" + try: + token_cost = response.json().get("token_cost", "unknown") + return Decimal(token_cost) if token_cost != "unknown" else Decimal("0") + except (ValueError, AttributeError, KeyError): + return Decimal("0") def _format_token_cost(self, token_cost: Decimal) -> str: return ( @@ -575,8 +631,9 @@ def get_analytics(self) -> Optional[Dict[str, Any]]: formatted_duration = self._format_duration(self.init_timestamp, self.end_timestamp) - # No need to make another update_session request here - # Just return the analytics data + # Update session to get token cost + self._update_session() + return { "LLM calls": self.event_counts["llms"], "Tool calls": self.event_counts["tools"], @@ -586,15 +643,45 @@ def get_analytics(self) -> Optional[Dict[str, Any]]: "Cost": self._format_token_cost(self.token_cost), } + def _setup_telemetry(self) -> None: + """Set up OpenTelemetry tracing.""" + # Create a Resource object with service name + resource = Resource.create({ + SERVICE_NAME: "agentops-sdk", + "session.id": str(self.session_id), + "platform.system": platform.system(), + "platform.release": platform.release(), + }) + + # Create a TracerProvider with the resource + self._tracer_provider = TracerProvider(resource=resource) + + # Create and register our custom exporter + self._otel_exporter = SessionExporter(self) + span_processor = BatchSpanProcessor(self._otel_exporter) + self._tracer_provider.add_span_processor(span_processor) + + # Set the TracerProvider as the global default + trace.set_tracer_provider(self._tracer_provider) + + # Get a tracer + self.tracer = trace.get_tracer(__name__) + + def _initialize_event_tracking(self) -> None: + """Initialize event tracking.""" + # Initialize event tracking state + self.event_counts = { + "llms": 0, + "tools": 0, + "actions": 0, + "errors": 0, + } + self.token_cost = Decimal("0") + @property def session_url(self) -> str: """Returns the URL for this session in the AgentOps dashboard.""" assert self.session_id, "Session ID is required to generate a session URL" return f"https://app.agentops.ai/drilldown?session_id={self.session_id}" - # @session_url.setter - # def session_url(self, url: str): - # pass - - active_sessions: List[Session] = []