diff --git a/agentops/session.py b/agentops/session.py index 3cfc1303..1213e2fd 100644 --- a/agentops/session.py +++ b/agentops/session.py @@ -5,7 +5,7 @@ import time from decimal import ROUND_HALF_UP, Decimal from termcolor import colored -from typing import Optional, List, Union +from typing import Any, Optional, List, Union from uuid import UUID, uuid4 from datetime import datetime @@ -15,7 +15,7 @@ from .log_config import logger from .config import Configuration from .helpers import get_ISO_time, filter_unjsonable, safe_serialize -from .http_client import HttpClient +from .http_client import HttpClient, Response class Session: @@ -24,14 +24,30 @@ class Session: Args: session_id (UUID): The session id is used to record particular runs. + config (Configuration): The configuration object for the session. tags (List[str], optional): Tags that can be used for grouping or sorting later. Examples could be ["GPT-4"]. + host_env (dict, optional): A dictionary containing host and environment data. Attributes: - init_timestamp (float): The timestamp for when the session started, represented as seconds since the epoch. - end_timestamp (float, optional): The timestamp for when the session ended, represented as seconds since the epoch. This is only set after end_session is called. - end_state (str, optional): The final state of the session. Suggested: "Success", "Fail", "Indeterminate". Defaults to "Indeterminate". + init_timestamp (str): The ISO timestamp for when the session started. + end_timestamp (str, optional): The ISO timestamp for when the session ended. Only set after end_session is called. + end_state (str, optional): The final state of the session. Options: "Success", "Fail", "Indeterminate". Defaults to "Indeterminate". end_state_reason (str, optional): The reason for ending the session. - + session_id (UUID): Unique identifier for the session. + tags (List[str]): List of tags associated with the session for grouping and filtering. + video (str, optional): URL to a video recording of the session. + host_env (dict, optional): Dictionary containing host and environment data. + config (Configuration): Configuration object containing settings for the session. + jwt (str, optional): JSON Web Token for authentication with the AgentOps API. + token_cost (Decimal): Running total of token costs for the session. + event_counts (dict): Counter for different types of events: + - llms: Number of LLM calls + - tools: Number of tool calls + - actions: Number of actions + - errors: Number of errors + - apis: Number of API calls + session_url (str, optional): URL to view the session in the AgentOps dashboard. + is_running (bool): Flag indicating if the session is currently active. """ def __init__( @@ -52,7 +68,8 @@ def __init__( self.config = config self.jwt = None self.lock = threading.Lock() - self.queue = [] + self.queue: List[Any] = [] + self.token_cost = Decimal(0) self.event_counts = { "llms": 0, "tools": 0, @@ -60,6 +77,7 @@ def __init__( "errors": 0, "apis": 0, } + self.session_url: Optional[str] = None self.stop_flag = threading.Event() self.thread = threading.Thread(target=self._run) @@ -87,10 +105,11 @@ def end_session( video: Optional[str] = None, ) -> Union[Decimal, None]: if not self.is_running: - return + return None if not any(end_state == state.value for state in EndState): - return logger.warning("Invalid end_state. Please use one of the EndState enums") + logger.warning("Invalid end_state. Please use one of the EndState enums") + return None self.end_timestamp = get_ISO_time() self.end_state = end_state @@ -101,77 +120,28 @@ def end_session( self.stop_flag.set() self.thread.join(timeout=1) self._flush_queue() - - def format_duration(start_time, end_time): - start = datetime.fromisoformat(start_time.replace("Z", "+00:00")) - end = datetime.fromisoformat(end_time.replace("Z", "+00:00")) - duration = end - start - - hours, remainder = divmod(duration.total_seconds(), 3600) - minutes, seconds = divmod(remainder, 60) - - parts = [] - if hours > 0: - parts.append(f"{int(hours)}h") - if minutes > 0: - parts.append(f"{int(minutes)}m") - parts.append(f"{seconds:.1f}s") - - return " ".join(parts) - - with self.lock: - payload = {"session": self.__dict__} - try: - res = HttpClient.post( - f"{self.config.endpoint}/v2/update_session", - json.dumps(filter_unjsonable(payload)).encode("utf-8"), - jwt=self.jwt, - ) - except ApiServerException as e: - return logger.error(f"Could not end session - {e}") - - logger.debug(res.body) - token_cost = res.body.get("token_cost", "unknown") - - formatted_duration = format_duration(self.init_timestamp, self.end_timestamp) - - if token_cost == "unknown" or token_cost is None: - token_cost_d = Decimal(0) - else: - token_cost_d = Decimal(token_cost) - - formatted_cost = ( - "{:.2f}".format(token_cost_d) - if token_cost_d == 0 - else "{:.6f}".format(token_cost_d.quantize(Decimal("0.000001"), rounding=ROUND_HALF_UP)) - ) + analytics_stats = self.get_analytics() analytics = ( f"Session Stats - " - f"{colored('Duration:', attrs=['bold'])} {formatted_duration} | " - f"{colored('Cost:', attrs=['bold'])} ${formatted_cost} | " - f"{colored('LLMs:', attrs=['bold'])} {self.event_counts['llms']} | " - f"{colored('Tools:', attrs=['bold'])} {self.event_counts['tools']} | " - f"{colored('Actions:', attrs=['bold'])} {self.event_counts['actions']} | " - f"{colored('Errors:', attrs=['bold'])} {self.event_counts['errors']}" + f"{colored('Duration:', attrs=['bold'])} {analytics_stats['Duration']} | " + f"{colored('Cost:', attrs=['bold'])} ${analytics_stats['Cost']} | " + f"{colored('LLMs:', attrs=['bold'])} {analytics_stats['LLM calls']} | " + f"{colored('Tools:', attrs=['bold'])} {analytics_stats['Tool calls']} | " + f"{colored('Actions:', attrs=['bold'])} {analytics_stats['Actions']} | " + f"{colored('Errors:', attrs=['bold'])} {analytics_stats['Errors']}" ) logger.info(analytics) - session_url = res.body.get( - "session_url", - f"https://app.agentops.ai/drilldown?session_id={self.session_id}", - ) - logger.info( colored( - f"\x1b[34mSession Replay: {session_url}\x1b[0m", + f"\x1b[34mSession Replay: {self.session_url}\x1b[0m", "blue", ) ) - active_sessions.remove(self) - return token_cost_d + return self.token_cost def add_tags(self, tags: List[str]) -> None: """ @@ -388,5 +358,80 @@ def wrapper(*args, **kwargs): return wrapper + @staticmethod + def _format_duration(start_time, end_time): + start = datetime.fromisoformat(start_time.replace("Z", "+00:00")) + end = datetime.fromisoformat(end_time.replace("Z", "+00:00")) + duration = end - start + + hours, remainder = divmod(duration.total_seconds(), 3600) + minutes, seconds = divmod(remainder, 60) + + parts = [] + if hours > 0: + parts.append(f"{int(hours)}h") + if minutes > 0: + parts.append(f"{int(minutes)}m") + parts.append(f"{seconds:.1f}s") + + return " ".join(parts) + + def _get_response(self) -> Optional[Response]: + with self.lock: + payload = {"session": self.__dict__} + try: + response = HttpClient.post( + f"{self.config.endpoint}/v2/update_session", + json.dumps(filter_unjsonable(payload)).encode("utf-8"), + jwt=self.jwt, + ) + except ApiServerException as e: + logger.error(f"Could not fetch response from server - {e}") + return None + + logger.debug(response.body) + return response + + def _get_token_cost(self, response: Response) -> Decimal: + token_cost = response.body.get("token_cost", "unknown") + if token_cost == "unknown" or token_cost is None: + return Decimal(0) + return Decimal(token_cost) + + @staticmethod + def _format_token_cost(token_cost_d): + return ( + "{:.2f}".format(token_cost_d) + if token_cost_d == 0 + else "{:.6f}".format(token_cost_d.quantize(Decimal("0.000001"), rounding=ROUND_HALF_UP)) + ) + + def get_analytics(self) -> Optional[dict[str, Union[Decimal, str]]]: + if not self.end_timestamp: + self.end_timestamp = get_ISO_time() + + formatted_duration = self._format_duration(self.init_timestamp, self.end_timestamp) + + response = self._get_response() + if response is None: + return None + + self.token_cost = self._get_token_cost(response) + formatted_cost = self._format_token_cost(self.token_cost) + + self.session_url = response.body.get( + "session_url", + f"https://app.agentops.ai/drilldown?session_id={self.session_id}", + ) + + return { + "LLM calls": self.event_counts["llms"], + "Tool calls": self.event_counts["tools"], + "Actions": self.event_counts["actions"], + "Errors": self.event_counts["errors"], + "Duration": formatted_duration, + "Cost": formatted_cost, + } + active_sessions: List[Session] = [] diff --git a/docs/v1/concepts/sessions.mdx b/docs/v1/concepts/sessions.mdx index 1da20fb3..68beaf02 100644 --- a/docs/v1/concepts/sessions.mdx +++ b/docs/v1/concepts/sessions.mdx @@ -51,6 +51,9 @@ Optionally, sessions may include: _Note: Overrides any current tags_ +#### `get_analytics` +**Returns** (dict): A dictionary containing various analytics metrics for the session. + ## Starting a Session When you call `agentops.init()`, a session is automatically started. @@ -62,7 +65,7 @@ Both `agentops.init()` and `agentops.start_session()` work as a factory pattern ## Ending a Session If a process ends without any call to agentops, it will show in the dashboard as `Indeterminate`. -To end with a state, call either `agentops.end_session(...)` [(reference)](/v1/usage/sdk-reference/#end-session) if only one session is in use. Otherwise use `session.end_session(...)` +To end with a state, call either `agentops.end_session(...)` [(reference)](/v1/usage/sdk-reference/#end-session) if only one session is in use. Otherwise use `session.end_session(...)`. ## Inherited Sessions When working with multiple agents running in different processes, it's possible to initialize AgentOps or start a session @@ -71,22 +74,51 @@ with an existing session_id. `agentops.init(inherited_session_id=)` `agentops.start_session(inherited_session_id=)` -You can retrieve the current `session_id` by assigning the returned value from `init()` or `start_session()` +You can retrieve the current `session_id` by assigning the returned value from `init()` or `start_session()`. -```python python + +```python import agentops session = agentops.init() # pass session.session_id to the other process +``` +```python # -- other process -- session_id = retrieve_session_id() # <-- your function agentops.init(inherited_session_id=) ``` + Both processes will now contribute data to the same session. +## Session Analytics +You can retrieve the analytics for a session by calling `session.get_analytics()`. + +The example below shows how to record events and retrieve analytics. + + + +```python +import agentops +session = agentops.init() +session.record(ActionEvent("llms")) +session.record(ActionEvent("tools")) +analytics = session.get_analytics() +print(analytics) +session.end_session("Success") +``` + +The output will look like this - + +```bash +{'LLM calls': 0, 'Tool calls': 0, 'Actions': 0, 'Errors': 0, 'Duration': '0.9s', 'Cost': '0.00'} +``` + + + ## The AgentOps SDK Client _More info for the curious_ diff --git a/tests/test_session.py b/tests/test_session.py index 1392cc91..25ad8246 100644 --- a/tests/test_session.py +++ b/tests/test_session.py @@ -183,6 +183,43 @@ def test_safe_get_session_with_multiple_sessions(self, mock_req): session = Client()._safe_get_session() assert session is None + def test_get_analytics(self, mock_req): + # Arrange + session = agentops.start_session() + session.add_tags(["test-session-analytics-tag"]) + assert session is not None + + # Record some events to increment counters + session.record(ActionEvent("llms")) + session.record(ActionEvent("tools")) + session.record(ActionEvent("actions")) + session.record(ActionEvent("errors")) + time.sleep(0.1) + + # Act + analytics = session.get_analytics() + + # Assert + assert isinstance(analytics, dict) + assert all(key in analytics for key in ["LLM calls", "Tool calls", "Actions", "Errors", "Duration", "Cost"]) + + # Check specific values + assert analytics["LLM calls"] == 1 + assert analytics["Tool calls"] == 1 + assert analytics["Actions"] == 1 + assert analytics["Errors"] == 1 + + # Check duration format + assert isinstance(analytics["Duration"], str) + assert "s" in analytics["Duration"] + + # Check cost format (mock returns token_cost: 5) + assert analytics["Cost"] == "5.000000" + + # End session and cleanup + session.end_session(end_state="Success") + agentops.end_all_sessions() + class TestMultiSessions: def setup_method(self): @@ -276,3 +313,49 @@ def test_add_tags(self, mock_req): "session-2", "session-2-added", ] + + def test_get_analytics_multiple_sessions(self, mock_req): + session_1 = agentops.start_session() + session_1.add_tags(["session-1", "test-analytics-tag"]) + session_2 = agentops.start_session() + session_2.add_tags(["session-2", "test-analytics-tag"]) + assert session_1 is not None + assert session_2 is not None + + # Record events in the sessions + session_1.record(ActionEvent("llms")) + session_1.record(ActionEvent("tools")) + session_2.record(ActionEvent("actions")) + session_2.record(ActionEvent("errors")) + + time.sleep(1.5) + + # Act + analytics_1 = session_1.get_analytics() + analytics_2 = session_2.get_analytics() + + # Assert 2 record_event requests - 2 for each session + assert analytics_1["LLM calls"] == 1 + assert analytics_1["Tool calls"] == 1 + assert analytics_1["Actions"] == 0 + assert analytics_1["Errors"] == 0 + + assert analytics_2["LLM calls"] == 0 + assert analytics_2["Tool calls"] == 0 + assert analytics_2["Actions"] == 1 + assert analytics_2["Errors"] == 1 + + # Check duration format + assert isinstance(analytics_1["Duration"], str) + assert "s" in analytics_1["Duration"] + assert isinstance(analytics_2["Duration"], str) + assert "s" in analytics_2["Duration"] + + # Check cost format (mock returns token_cost: 5) + assert analytics_1["Cost"] == "5.000000" + assert analytics_2["Cost"] == "5.000000" + + end_state = "Success" + + session_1.end_session(end_state) + session_2.end_session(end_state)