From e3ae1596060040942cf9177d688d15efbdb2e058 Mon Sep 17 00:00:00 2001 From: Shawn Qiu Date: Tue, 30 Jul 2024 17:56:25 -0700 Subject: [PATCH] Rework initialization, improve logging, and other refactoring (#315) --- agentops/__init__.py | 270 ++++++--- agentops/agent.py | 56 -- agentops/client.py | 536 +++++++---------- agentops/config.py | 209 +++---- agentops/decorators.py | 178 +++++- agentops/exceptions.py | 7 +- agentops/helpers.py | 21 +- agentops/http_client.py | 22 +- agentops/langchain_callback_handler.py | 727 ------------------------ agentops/log_config.py | 48 +- agentops/meta_client.py | 14 +- agentops/session.py | 160 ++++-- agentops/state.py | 9 - docs/v1/concepts/tags.mdx | 11 +- docs/v1/usage/environment-variables.mdx | 2 + pyproject.toml | 2 +- tach.yml | 114 ++-- tests/test_canary.py | 3 +- tests/test_events.py | 5 +- tests/test_record_function.py | 14 +- tests/test_session.py | 82 +-- tests/test_teardown.py | 5 +- 22 files changed, 912 insertions(+), 1583 deletions(-) delete mode 100644 agentops/agent.py delete mode 100644 agentops/langchain_callback_handler.py delete mode 100644 agentops/state.py diff --git a/agentops/__init__.py b/agentops/__init__.py index 5c7ca5cb7..54ca039b4 100755 --- a/agentops/__init__.py +++ b/agentops/__init__.py @@ -1,18 +1,12 @@ # agentops/__init__.py -import functools -import os -import logging +import sys from typing import Optional, List, Union from .client import Client -from .config import ClientConfiguration from .event import Event, ActionEvent, LLMEvent, ToolEvent, ErrorEvent -from .decorators import record_function -from .agent import track_agent +from .decorators import record_function, track_agent from .log_config import logger from .session import Session -from .state import get_state, set_state - try: from .partners.langchain_callback_handler import ( @@ -22,20 +16,13 @@ except ModuleNotFoundError: pass +if "autogen" in sys.modules: + Client().configure(instrument_llm_calls=False) + Client().add_default_tags(["autogen"]) -def noop(*args, **kwargs): - return - - -def check_init(child_function): - @functools.wraps(child_function) - def wrapper(*args, **kwargs): - if get_state("is_initialized"): # is initialized in state.py is not working - return child_function(*args, **kwargs) - else: - return noop(*args, **kwargs) - - return wrapper +if "crewai" in sys.modules: + Client().configure(instrument_llm_calls=False) + Client().add_default_tags(["crewai"]) def init( @@ -44,17 +31,17 @@ def init( endpoint: Optional[str] = None, max_wait_time: Optional[int] = None, max_queue_size: Optional[int] = None, - tags: Optional[List[str]] = None, - instrument_llm_calls=True, - auto_start_session=True, + tags: Optional[List[str]] = None, # Deprecated + default_tags: Optional[List[str]] = None, + instrument_llm_calls: Optional[bool] = None, + auto_start_session: Optional[bool] = None, inherited_session_id: Optional[str] = None, - skip_auto_end_session: Optional[bool] = False, + skip_auto_end_session: Optional[bool] = None, ) -> Union[Session, None]: """ Initializes the AgentOps singleton pattern. Args: - api_key (str, optional): API Key for AgentOps services. If none is provided, key will be read from the AGENTOPS_API_KEY environment variable. parent_key (str, optional): Organization key to give visibility of all user sessions the user's organization. If none is provided, key will @@ -62,84 +49,93 @@ def init( endpoint (str, optional): The endpoint for the AgentOps service. If none is provided, key will be read from the AGENTOPS_API_ENDPOINT environment variable. Defaults to 'https://api.agentops.ai'. max_wait_time (int, optional): The maximum time to wait in milliseconds before flushing the queue. - Defaults to 30,000 (30 seconds) + Defaults to 5,000 (5 seconds) max_queue_size (int, optional): The maximum size of the event queue. Defaults to 100. - tags (List[str], optional): Tags for the sessions that can be used for grouping or - sorting later (e.g. ["GPT-4"]). + tags (List[str], optional): [Deprecated] Use `default_tags` instead. + default_tags (List[str], optional): Default tags for the sessions that can be used for grouping or sorting later (e.g. ["GPT-4"]). instrument_llm_calls (bool): Whether to instrument LLM calls and emit LLMEvents. auto_start_session (bool): Whether to start a session automatically when the client is created. inherited_session_id (optional, str): Init Agentops with an existing Session - skip_auto_end_session (optional, bool): Don't automatically end session based on your framework's decision-making (i.e. Crew determining when tasks are complete and ending the session) + skip_auto_end_session (optional, bool): Don't automatically end session based on your framework's decision-making + (i.e. Crew determining when tasks are complete and ending the session) Attributes: """ - logging_level = os.getenv("AGENTOPS_LOGGING_LEVEL") - log_levels = { - "CRITICAL": logging.CRITICAL, - "ERROR": logging.ERROR, - "INFO": logging.INFO, - "WARNING": logging.WARNING, - "DEBUG": logging.DEBUG, - } - logger.setLevel(log_levels.get(logging_level or "INFO", "INFO")) - - c = Client( + Client().unsuppress_logs() + if Client().is_initialized: + return logger.warning("AgentOps has already been initialized") + + if tags is not None: + logger.warning("The 'tags' parameter is deprecated. Use 'default_tags' instead") + if default_tags is None: + default_tags = tags + + Client().configure( api_key=api_key, parent_key=parent_key, endpoint=endpoint, max_wait_time=max_wait_time, max_queue_size=max_queue_size, - tags=tags, + default_tags=default_tags, instrument_llm_calls=instrument_llm_calls, - auto_start_session=False, # handled below - inherited_session_id=inherited_session_id, + auto_start_session=auto_start_session, skip_auto_end_session=skip_auto_end_session, ) - # handle auto_start_session here so we can get the session object to return rather than client above - # if the client automatically starts a session from a partner framework don't start a second - session = None - if auto_start_session and len(c.current_session_ids) == 0: - session = c.start_session( - tags=tags, config=c.config, inherited_session_id=inherited_session_id - ) + if inherited_session_id is not None: + if auto_start_session == False: + Client().add_pre_init_warning( + "auto_start_session is set to False - inherited_session_id will not be used to automatically start a session" + ) + return Client().initialize() - set_state("is_initialized", True) + Client().configure(auto_start_session=False) + Client().initialize() + return Client().start_session(inherited_session_id=inherited_session_id) - return session + return Client().initialize() -def end_session( - end_state: str, - end_state_reason: Optional[str] = None, - video: Optional[str] = None, - is_auto_end: Optional[bool] = False, +def configure( + api_key: Optional[str] = None, + parent_key: Optional[str] = None, + endpoint: Optional[str] = None, + max_wait_time: Optional[int] = None, + max_queue_size: Optional[int] = None, + default_tags: Optional[List[str]] = None, + instrument_llm_calls: Optional[bool] = None, + auto_start_session: Optional[bool] = None, + skip_auto_end_session: Optional[bool] = None, ): """ - End the current session with the AgentOps service. + Configure the AgentOps Client Args: - end_state (str): The final state of the session. Options: Success, Fail, or Indeterminate. - end_state_reason (str, optional): The reason for ending the session. - video (str, optional): URL to a video recording of the session - is_auto_end (bool, optional): is this an automatic use of end_session and should be skipped with bypass_auto_end_session + api_key (str, optional): API Key for AgentOps services. + parent_key (str, optional): Organization key to give visibility of all user sessions the user's organization. + endpoint (str, optional): The endpoint for the AgentOps service. + max_wait_time (int, optional): The maximum time to wait in milliseconds before flushing the queue. + max_queue_size (int, optional): The maximum size of the event queue + default_tags (List[str], optional): Default tags for the sessions that can be used for grouping or sorting later (e.g. ["GPT-4"]). + instrument_llm_calls (bool, optional): Whether to instrument LLM calls and emit LLMEvents. + auto_start_session (bool, optional): Whether to start a session automatically when the client is created. + skip_auto_end_session (bool, optional): Don't automatically end session based on your framework's decision-making + (i.e. Crew determining when tasks are complete and ending the session) """ - Client().end_session( - end_state=end_state, - end_state_reason=end_state_reason, - video=video, - is_auto_end=is_auto_end, + Client().configure( + api_key=api_key, + parent_key=parent_key, + endpoint=endpoint, + max_wait_time=max_wait_time, + max_queue_size=max_queue_size, + default_tags=default_tags, + instrument_llm_calls=instrument_llm_calls, + auto_start_session=auto_start_session, + skip_auto_end_session=skip_auto_end_session, ) -# Mostly used for unit testing - -# prevents unexpected sessions on new tests -def end_all_sessions() -> None: - return Client().end_all_sessions() - - def start_session( tags: Optional[List[str]] = None, - config: Optional[ClientConfiguration] = None, inherited_session_id: Optional[str] = None, ) -> Union[Session, None]: """ @@ -148,21 +144,51 @@ def start_session( Args: tags (List[str], optional): Tags that can be used for grouping or sorting later. e.g. ["test_run"]. - config: (Configuration, optional): Client configuration object, inherited_session_id: (str, optional): Set the session ID to inherit from another client """ + Client().unsuppress_logs() + + if not Client().is_initialized: + return logger.warning( + "AgentOps has not been initialized yet. Please call agentops.init() before starting a session" + ) + + return Client().start_session(tags, inherited_session_id) + + +def end_session( + end_state: str, + end_state_reason: Optional[str] = None, + video: Optional[str] = None, + is_auto_end: Optional[bool] = False, +): + """ + End the current session with the AgentOps service. - try: - sess_result = Client().start_session(tags, config, inherited_session_id) + Args: + end_state (str): The final state of the session. Options: Success, Fail, or Indeterminate. + end_state_reason (str, optional): The reason for ending the session. + video (str, optional): URL to a video recording of the session + """ + Client().unsuppress_logs() - set_state("is_initialized", True) + if Client().is_multi_session: + return logger.warning( + "Could not end session - multiple sessions detected. You must use session.end_session() instead of agentops.end_session()" + + " More info: https://docs.agentops.ai/v1/concepts/core-concepts#session-management" + ) - return sess_result - except Exception: - pass + if not Client().has_sessions: + return logger.warning("Could not end session - no sessions detected") + + Client().end_session( + end_state=end_state, + end_state_reason=end_state_reason, + video=video, + is_auto_end=is_auto_end, + ) -@check_init def record(event: Union[Event, ErrorEvent]): """ Record an event with the AgentOps service. @@ -170,6 +196,19 @@ def record(event: Union[Event, ErrorEvent]): Args: event (Event): The event to record. """ + Client().unsuppress_logs() + + if Client().is_multi_session: + return logger.warning( + "Could not record event - multiple sessions detected. You must use session.record() instead of agentops.record()" + + " More info: https://docs.agentops.ai/v1/concepts/core-concepts#session-management" + ) + + if not Client().has_sessions: + return logger.warning( + "Could not record event - no sessions detected. Create a session by calling agentops.start_session()" + ) + Client().record(event) @@ -180,6 +219,17 @@ def add_tags(tags: List[str]): Args: tags (List[str]): The list of tags to append. """ + if Client().is_multi_session: + return logger.warning( + "Could not add tags to session - multiple sessions detected. You must use session.add_tags() instead of agentops.add_tags()" + + " More info: https://docs.agentops.ai/v1/concepts/core-concepts#session-management" + ) + + if not Client().has_sessions: + return logger.warning( + "Could not add tags to session - no sessions detected. Create a session by calling agentops.start_session()" + ) + Client().add_tags(tags) @@ -190,27 +240,71 @@ def set_tags(tags: List[str]): Args: tags (List[str]): The list of tags to set. """ + if Client().is_multi_session: + return logger.warning( + "Could not set tags on session - multiple sessions detected. You must use session.set_tags() instead of agentops.set_tags()" + + " More info: https://docs.agentops.ai/v1/concepts/core-concepts#session-management" + ) + + if not Client().has_sessions: + return logger.warning( + "Could not set tags on session - no sessions detected. Create a session by calling agentops.start_session()" + ) + Client().set_tags(tags) -def get_api_key() -> str: +def get_api_key() -> Union[str, None]: return Client().api_key -def set_parent_key(parent_key): +def set_api_key(api_key: str) -> None: + Client().configure(api_key=api_key) + + +def set_parent_key(parent_key: str): """ Set the parent API key so another organization can view data. Args: parent_key (str): The API key of the parent organization to set. """ - Client().set_parent_key(parent_key) + Client().configure(parent_key=parent_key) def stop_instrumenting(): - Client().stop_instrumenting() + if Client().is_initialized: + Client().stop_instrumenting() -@check_init def create_agent(name: str, agent_id: Optional[str] = None): + if Client().is_multi_session: + return logger.warning( + "Could not create agent - multiple sessions detected. You must use session.create_agent() instead of agentops.create_agent()" + + " More info: https://docs.agentops.ai/v1/concepts/core-concepts#session-management" + ) + + if not Client().has_sessions: + return logger.warning( + "Could not create agent - no sessions detected. Create a session by calling agentops.start_session()" + ) + return Client().create_agent(name=name, agent_id=agent_id) + + +def get_session(session_id: str): + """ + Get an active (not ended) session from the AgentOps service + + Args: + session_id (str): the session id for the session to be retreived + """ + Client().unsuppress_logs() + + return Client().get_session(session_id) + + +# Mostly used for unit testing - +# prevents unexpected sessions on new tests +def end_all_sessions() -> None: + return Client().end_all_sessions() diff --git a/agentops/agent.py b/agentops/agent.py deleted file mode 100644 index 8ed411742..000000000 --- a/agentops/agent.py +++ /dev/null @@ -1,56 +0,0 @@ -from typing import Union - -from .log_config import logger -from uuid import uuid4 -from agentops import Client -from inspect import isclass, isfunction -from .state import get_state - - -def track_agent(name: Union[str, None] = None): - def decorator(obj): - if name: - obj.agent_ops_agent_name = name - - if isclass(obj): - original_init = obj.__init__ - - def new_init(self, *args, **kwargs): - try: - original_init(self, *args, **kwargs) - - if not get_state("is_initialized"): - return - - self.agent_ops_agent_id = str(uuid4()) - - session = kwargs.get("session", None) - if session is not None: - self.agent_ops_session_id = session.session_id - - Client().create_agent( - name=self.agent_ops_agent_name, - agent_id=self.agent_ops_agent_id, - session=session, - ) - except AttributeError as e: - logger.warning( - "Failed to track an agent. This often happens if agentops.init() was not " - "called before initializing an agent with the @track_agent decorator." - ) - raise e - - obj.__init__ = new_init - - elif isfunction(obj): - obj.agent_ops_agent_id = str(uuid4()) - Client().create_agent( - name=obj.agent_ops_agent_name, agent_id=obj.agent_ops_agent_id - ) - - else: - raise Exception("Invalid input, 'obj' must be a class or a function") - - return obj - - return decorator diff --git a/agentops/client.py b/agentops/client.py index c01a6c443..4007e4102 100644 --- a/agentops/client.py +++ b/agentops/client.py @@ -5,156 +5,116 @@ Client: Provides methods to interact with the AgentOps service. """ -import os import inspect import atexit +import logging +import os import signal import sys import threading import traceback -import logging -from decimal import Decimal, ROUND_HALF_UP +from decimal import Decimal from uuid import UUID, uuid4 -from typing import Optional, List, Union +from typing import Optional, List, Union, Tuple +from termcolor import colored -from .event import ActionEvent, ErrorEvent, Event -from .enums import EndState -from .exceptions import NoSessionException, MultiSessionException, ConfigurationError +from .event import Event, ErrorEvent from .helpers import ( - get_ISO_time, - check_call_stack_for_agent_id, - get_partner_frameworks, conditional_singleton, ) -from .session import Session +from .session import Session, active_sessions from .host_env import get_host_env from .log_config import logger from .meta_client import MetaClient -from .config import ClientConfiguration +from .config import Configuration from .llm_tracker import LlmTracker -from termcolor import colored -from typing import Tuple -from .state import get_state, set_state @conditional_singleton class Client(metaclass=MetaClient): - """ - Client for AgentOps service. - - Args: - - api_key (str, optional): API Key for AgentOps services. If none is provided, key will - be read from the AGENTOPS_API_KEY environment variable. - parent_key (str, optional): Organization key to give visibility of all user sessions the user's organization. - If none is provided, key will be read from the AGENTOPS_PARENT_KEY environment variable. - endpoint (str, optional): The endpoint for the AgentOps service. If none is provided, key will - be read from the AGENTOPS_API_ENDPOINT environment variable. Defaults to 'https://api.agentops.ai'. - max_wait_time (int, optional): The maximum time to wait in milliseconds before flushing the queue. - Defaults to 30,000 (30 seconds) - max_queue_size (int, optional): The maximum size of the event queue. Defaults to 100. - tags (List[str], optional): Tags for the sessions that can be used for grouping or - sorting later (e.g. ["GPT-4"]). - override (bool, optional): [Deprecated] Use `instrument_llm_calls` instead. Whether to instrument LLM calls - and emit LLMEvents. - instrument_llm_calls (bool): Whether to instrument LLM calls and emit LLMEvents.. - auto_start_session (bool): Whether to start a session automatically when the client is created. - inherited_session_id (optional, str): Init Agentops with an existing Session - skip_auto_end_session (optional, bool): Don't automatically end session based on your framework's decision making - Attributes: - _session (Session, optional): A Session is a grouping of events (e.g. a run of your agent). - """ - - def __init__( + def __init__(self): + self._pre_init_messages: List[str] = [] + self._initialized: bool = False + self._llm_tracker: Optional[LlmTracker] = None + self._sessions: List[Session] = active_sessions + self._config = Configuration() + + self.configure( + api_key=os.environ.get("AGENTOPS_API_KEY"), + parent_key=os.environ.get("AGENTOPS_PARENT_KEY"), + endpoint=os.environ.get("AGENTOPS_API_ENDPOINT"), + env_data_opt_out=os.environ.get( + "AGENTOPS_ENV_DATA_OPT_OUT", "False" + ).lower() + == "true", + ) + + def configure( self, api_key: Optional[str] = None, parent_key: Optional[str] = None, endpoint: Optional[str] = None, max_wait_time: Optional[int] = None, max_queue_size: Optional[int] = None, - tags: Optional[List[str]] = None, - override: Optional[bool] = None, # Deprecated - instrument_llm_calls=True, - auto_start_session=False, - inherited_session_id: Optional[str] = None, - skip_auto_end_session: Optional[bool] = False, + default_tags: Optional[List[str]] = None, + instrument_llm_calls: Optional[bool] = None, + auto_start_session: Optional[bool] = None, + skip_auto_end_session: Optional[bool] = None, + env_data_opt_out: Optional[bool] = None, ): - if override is not None: - logger.warning( - "The 'override' parameter is deprecated. Use 'instrument_llm_calls' instead.", - DeprecationWarning, - stacklevel=2, + if self.has_sessions: + return logger.warning( + f"{len(self._sessions)} session(s) in progress. Configuration is locked until there are no more sessions running" ) - instrument_llm_calls = instrument_llm_calls or override - self._sessions: Optional[List[Session]] = [] - self._tags: Optional[List[str]] = tags - self._tags_for_future_session: Optional[List[str]] = None - - self._env_data_opt_out = ( - os.environ.get("AGENTOPS_ENV_DATA_OPT_OUT", "False").lower() == "true" + self._config.configure( + self, + api_key=api_key, + parent_key=parent_key, + endpoint=endpoint, + max_wait_time=max_wait_time, + max_queue_size=max_queue_size, + default_tags=default_tags, + instrument_llm_calls=instrument_llm_calls, + auto_start_session=auto_start_session, + skip_auto_end_session=skip_auto_end_session, + env_data_opt_out=env_data_opt_out, ) - self.llm_tracker = None - - self.config = None - try: - self.config = ClientConfiguration( - api_key=api_key, - parent_key=parent_key, - endpoint=endpoint, - max_wait_time=max_wait_time, - max_queue_size=max_queue_size, - skip_auto_end_session=skip_auto_end_session, + def initialize(self) -> Union[Session, None]: + self.unsuppress_logs() + + if self._config.api_key is None: + return logger.error( + "Could not initialize AgentOps client - API Key is missing." + + "\n\t Find your API key at https://app.agentops.ai/settings/projects" ) - if inherited_session_id is not None: - # Check if inherited_session_id is valid - UUID(inherited_session_id) + self._handle_unclean_exits() + self._initialize_partner_framework() - except ConfigurationError: - logger.warning("Failed to setup client Configuration") - return + self._initialized = True - self._handle_unclean_exits() + if self._config.instrument_llm_calls: + self._llm_tracker = LlmTracker(self) + self._llm_tracker.override_api() - instrument_llm_calls, auto_start_session = self._check_for_partner_frameworks( - instrument_llm_calls, auto_start_session - ) + session = None + if self._config.auto_start_session: + session = self.start_session() - if auto_start_session: - self.start_session(tags, self.config, inherited_session_id) - else: - self._tags_for_future_session = tags - - if instrument_llm_calls: - self.llm_tracker = LlmTracker(self) - self.llm_tracker.override_api() - - def _check_for_partner_frameworks( - self, instrument_llm_calls, auto_start_session - ) -> Tuple[bool, bool]: - partner_frameworks = get_partner_frameworks() - for framework in partner_frameworks.keys(): - if framework in sys.modules: - self.add_tags([framework]) - if framework == "autogen": - try: - import autogen - from .partners.autogen_logger import AutogenLogger - - autogen.runtime_logging.start(logger=AutogenLogger()) - self.add_tags(["autogen"]) - except ImportError: - pass - except Exception as e: - logger.warning( - f"Failed to set up AutoGen logger with AgentOps. Error: {e}" - ) - - return partner_frameworks[framework] - - return instrument_llm_calls, auto_start_session + return session + + def _initialize_partner_framework(self) -> None: + try: + import autogen + from .partners.autogen_logger import AutogenLogger + + autogen.runtime_logging.start(logger=AutogenLogger()) + except ImportError: + pass + except Exception as e: + logger.warning(f"Failed to set up AutoGen logger with AgentOps. Error: {e}") def add_tags(self, tags: List[str]) -> None: """ @@ -163,25 +123,19 @@ def add_tags(self, tags: List[str]) -> None: Args: tags (List[str]): The list of tags to append. """ + if not self.is_initialized: + return # if a string and not a list of strings if not (isinstance(tags, list) and all(isinstance(item, str) for item in tags)): if isinstance(tags, str): # if it's a single string tags = [tags] # make it a list - if len(self._sessions) == 0: - if self._tags_for_future_session: - for tag in tags: - if tag not in self._tags_for_future_session: - self._tags_for_future_session.append(tag) - else: - self._tags_for_future_session = tags - - return - session = self._safe_get_session() if session is None: - return + return logger.warning( + "Could not add tags. Start a session by calling agentops.start_session()." + ) session.add_tags(tags=tags) @@ -194,14 +148,35 @@ def set_tags(self, tags: List[str]) -> None: Args: tags (List[str]): The list of tags to set. """ + if not self.is_initialized: + return session = self._safe_get_session() if session is None: - self._tags_for_future_session = tags - return + return logger.warning( + "Could not set tags. Start a session by calling agentops.start_session()." + ) + else: + session.set_tags(tags=tags) + + def add_default_tags(self, tags: List[str]) -> None: + """ + Append default tags at runtime. + + Args: + tags (List[str]): The list of tags to set. + """ + self._config.default_tags.update(tags) + + def get_default_tags(self) -> List[str]: + """ + Append default tags at runtime. - session.set_tags(tags=tags) + Args: + tags (List[str]): The list of tags to set. + """ + return list(self._config.default_tags) def record(self, event: Union[Event, ErrorEvent]) -> None: """ @@ -210,131 +185,19 @@ def record(self, event: Union[Event, ErrorEvent]) -> None: Args: event (Event): The event to record. """ + if not self.is_initialized: + return session = self._safe_get_session() if session is None: - logger.error("Could not record event. No session.") - return + return logger.error( + "Could not record event. Start a session by calling agentops.start_session()." + ) session.record(event) - def _record_event_sync(self, func, event_name, *args, **kwargs): - init_time = get_ISO_time() - session: Optional[Session] = kwargs.get("session", None) - if "session" in kwargs.keys(): - del kwargs["session"] - if session is None: - if len(Client().current_session_ids) > 1: - raise ValueError( - "If multiple sessions exists, `session` is a required parameter in the function decorated by @record_function" - ) - func_args = inspect.signature(func).parameters - arg_names = list(func_args.keys()) - # Get default values - arg_values = { - name: func_args[name].default - for name in arg_names - if func_args[name].default is not inspect._empty - } - # Update with positional arguments - arg_values.update(dict(zip(arg_names, args))) - arg_values.update(kwargs) - - event = ActionEvent( - params=arg_values, - init_timestamp=init_time, - agent_id=check_call_stack_for_agent_id(), - action_type=event_name, - ) - - try: - returns = func(*args, **kwargs) - - # If the function returns multiple values, record them all in the same event - if isinstance(returns, tuple): - returns = list(returns) - - event.returns = returns - - if hasattr(returns, "screenshot"): - event.screenshot = returns.screenshot - - event.end_timestamp = get_ISO_time() - - if session: - session.record(event) - else: - self.record(event) - - except Exception as e: - self.record(ErrorEvent(trigger_event=event, exception=e)) - - # Re-raise the exception - raise - - return returns - - async def _record_event_async(self, func, event_name, *args, **kwargs): - init_time = get_ISO_time() - session: Union[Session, None] = kwargs.get("session", None) - if "session" in kwargs.keys(): - del kwargs["session"] - if session is None: - if len(Client().current_session_ids) > 1: - raise ValueError( - "If multiple sessions exists, `session` is a required parameter in the function decorated by @record_function" - ) - func_args = inspect.signature(func).parameters - arg_names = list(func_args.keys()) - # Get default values - arg_values = { - name: func_args[name].default - for name in arg_names - if func_args[name].default is not inspect._empty - } - # Update with positional arguments - arg_values.update(dict(zip(arg_names, args))) - arg_values.update(kwargs) - - event = ActionEvent( - params=arg_values, - init_timestamp=init_time, - agent_id=check_call_stack_for_agent_id(), - action_type=event_name, - ) - - try: - returns = await func(*args, **kwargs) - - # If the function returns multiple values, record them all in the same event - if isinstance(returns, tuple): - returns = list(returns) - - event.returns = returns - - # NOTE: Will likely remove in future since this is tightly coupled. Adding it to see how useful we find it for now - # TODO: check if screenshot is the url string we expect it to be? And not e.g. "True" - if hasattr(returns, "screenshot"): - event.screenshot = returns.screenshot - - event.end_timestamp = get_ISO_time() - - if session: - session.record(event) - else: - self.record(event) - - except Exception as e: - self.record(ErrorEvent(trigger_event=event, exception=e)) - - # Re-raise the exception - raise - - return returns - def start_session( self, tags: Optional[List[str]] = None, - config: Optional[ClientConfiguration] = None, inherited_session_id: Optional[str] = None, ) -> Union[Session, None]: """ @@ -346,34 +209,30 @@ def start_session( config: (Configuration, optional): Client configuration object inherited_session_id (optional, str): assign session id to match existing Session """ - logging_level = os.getenv("AGENTOPS_LOGGING_LEVEL") - log_levels = { - "CRITICAL": logging.CRITICAL, - "ERROR": logging.ERROR, - "INFO": logging.INFO, - "WARNING": logging.WARNING, - "DEBUG": logging.DEBUG, - } - logger.setLevel(log_levels.get(logging_level or "INFO", "INFO")) + if not self.is_initialized: + return - if not config and not self.config: - return logger.warning( - "Cannot start session - missing configuration - did you call init()?" - ) + if inherited_session_id is not None: + try: + session_id = UUID(inherited_session_id) + except ValueError: + return logger.warning(f"Invalid session id: {inherited_session_id}") + else: + session_id = uuid4() - session_id = ( - UUID(inherited_session_id) if inherited_session_id is not None else uuid4() - ) + session_tags = self._config.default_tags.copy() + if tags is not None: + session_tags.update(tags) session = Session( session_id=session_id, - tags=tags or self._tags_for_future_session, - host_env=get_host_env(self._env_data_opt_out), - config=config or self.config, + tags=list(session_tags), + host_env=get_host_env(self._config.env_data_opt_out), + config=self._config, ) - if not session: - return logger.warning("Cannot start session - server rejected session") + if not session.is_running: + return logger.error("Failed to start session") logger.info( colored( @@ -404,54 +263,17 @@ def end_session( Returns: Decimal: The token cost of the session. Returns 0 if the cost is unknown. """ - session = self._safe_get_session() if session is None: return - session.end_state = end_state - session.end_state_reason = end_state_reason - - if is_auto_end and self.config.skip_auto_end_session: + if is_auto_end and self._config.skip_auto_end_session: return - if not any(end_state == state.value for state in EndState): - return logger.warning( - "Invalid end_state. Please use one of the EndState enums" - ) - - session.video = video - - if not session.end_timestamp: - session.end_timestamp = get_ISO_time() - - token_cost = session.end_session(end_state=end_state) - - if token_cost == "unknown" or token_cost is None: - logger.info("Could not determine cost of run.") - token_cost_d = Decimal(0) - else: - token_cost_d = Decimal(token_cost) - logger.info( - "This run's cost ${}".format( - "{:.2f}".format(token_cost_d) - if token_cost_d == 0 - else "{:.6f}".format( - token_cost_d.quantize( - Decimal("0.000001"), rounding=ROUND_HALF_UP - ) - ) - ) - ) - - logger.info( - colored( - f"\x1b[34mSession Replay: https://app.agentops.ai/drilldown?session_id={session.session_id}\x1b[0m", - "blue", - ) + token_cost = session.end_session( + end_state=end_state, end_state_reason=end_state_reason, video=video ) - self._sessions.remove(session) - return token_cost_d + return token_cost def create_agent( self, @@ -533,30 +355,12 @@ def handle_exception(exc_type, exc_value, exc_traceback): signal.signal(signal.SIGTERM, signal_handler) sys.excepthook = handle_exception - @property - def current_session_ids(self) -> List[str]: - return [str(s.session_id) for s in self._sessions] - - @property - def api_key(self): - return self.config.api_key - - def set_parent_key(self, parent_key: str): - """ - Set the parent API key which has visibility to projects it is parent to. - - Args: - parent_key (str): The API key of the parent organization to set. - """ - self.config.parent_key = parent_key - - @property - def parent_key(self): - return self.config.parent_key - def stop_instrumenting(self): - if self.llm_tracker: - self.llm_tracker.stop_instrumenting() + if self._llm_tracker is not None: + self._llm_tracker.stop_instrumenting() + + def add_pre_init_warning(self, message: str): + self._pre_init_messages.append(message) # replaces the session currently stored with a specific session_id, with a new session def _update_session(self, session: Session): @@ -571,32 +375,76 @@ def _update_session(self, session: Session): ] = session def _safe_get_session(self) -> Optional[Session]: - for s in self._sessions: - if s.end_state is not None: - self._sessions.remove(s) - - session = None + if not self.is_initialized: + return None if len(self._sessions) == 1: - session = self._sessions[0] + return self._sessions[0] - if len(self._sessions) == 0: - if get_state("is_initialized"): - return None - - elif len(self._sessions) > 1: + if len(self._sessions) > 1: calling_function = inspect.stack()[ 2 ].function # Using index 2 because we have a wrapper at index 1 - logger.warning( + return logger.warning( f"Multiple sessions detected. You must use session.{calling_function}(). More info: https://docs.agentops.ai/v1/concepts/core-concepts#session-management" ) - return + return None - return session + def get_session(self, session_id: str): + """ + Get an active (not ended) session from the AgentOps service + + Args: + session_id (str): the session id for the session to be retreived + """ + for session in self._sessions: + if session.session_id == session_id: + return session + + def unsuppress_logs(self): + logging_level = os.getenv("AGENTOPS_LOGGING_LEVEL", "INFO") + log_levels = { + "CRITICAL": logging.CRITICAL, + "ERROR": logging.ERROR, + "INFO": logging.INFO, + "WARNING": logging.WARNING, + "DEBUG": logging.DEBUG, + } + logger.setLevel(log_levels.get(logging_level, "INFO")) + + for message in self._pre_init_messages: + logger.warning(message) def end_all_sessions(self): for s in self._sessions: s.end_session() self._sessions.clear() + + @property + def is_initialized(self) -> bool: + return self._initialized + + @property + def has_sessions(self) -> bool: + return len(self._sessions) > 0 + + @property + def is_multi_session(self) -> bool: + return len(self._sessions) > 1 + + @property + def session_count(self) -> int: + return len(self._sessions) + + @property + def current_session_ids(self) -> List[str]: + return [str(s.session_id) for s in self._sessions] + + @property + def api_key(self): + return self._config.api_key + + @property + def parent_key(self): + return self._config.parent_key diff --git a/agentops/config.py b/agentops/config.py index 40e68be15..d14c85023 100644 --- a/agentops/config.py +++ b/agentops/config.py @@ -1,149 +1,74 @@ -""" -AgentOps Client configuration. - -Classes: - ClientConfiguration: Stores the configuration settings for AgentOps clients. -""" - -from typing import Optional -from os import environ - -from .exceptions import ConfigurationError - - -class ClientConfiguration: - """ - Stores the configuration settings for AgentOps clients. - - Args: - api_key (str, optional): API Key for AgentOps services. If none is provided, key will - be read from the AGENTOPS_API_KEY environment variable. - parent_key (str, optional): Organization key to give visibility of all user sessions the user's organization. If none is provided, key will - be read from the AGENTOPS_PARENT_KEY environment variable. - endpoint (str, optional): The endpoint for the AgentOps service. If none is provided, key will - be read from the AGENTOPS_API_ENDPOINT environment variable. Defaults to 'https://api.agentops.ai'. - max_wait_time (int, optional): The maximum time to wait in milliseconds before flushing the queue. Defaults to 5000. - max_queue_size (int, optional): The maximum size of the event queue. Defaults to 100. - """ - - def __init__( +from typing import List, Optional +from uuid import UUID + +from .log_config import logger + + +class Configuration: + def __init__(self): + self.api_key: Optional[str] = None + self.parent_key: Optional[str] = None + self.endpoint: str = "https://api.agentops.ai" + self.max_wait_time: int = 5000 + self.max_queue_size: int = 100 + self.default_tags: set[str] = set() + self.instrument_llm_calls: bool = True + self.auto_start_session: bool = True + self.skip_auto_end_session: bool = False + self.env_data_opt_out: bool = False + + def configure( self, + client, api_key: Optional[str] = None, parent_key: Optional[str] = None, endpoint: Optional[str] = None, max_wait_time: Optional[int] = None, max_queue_size: Optional[int] = None, - skip_auto_end_session: Optional[bool] = False, + default_tags: Optional[List[str]] = None, + instrument_llm_calls: Optional[bool] = None, + auto_start_session: Optional[bool] = None, + skip_auto_end_session: Optional[bool] = None, + env_data_opt_out: Optional[bool] = None, ): - - if not api_key: - api_key = environ.get("AGENTOPS_API_KEY", None) - if not api_key: - raise ConfigurationError( - "No API key provided - no data will be recorded." - ) - - if not parent_key: - parent_key = environ.get("AGENTOPS_PARENT_KEY", None) - - if not endpoint: - endpoint = environ.get("AGENTOPS_API_ENDPOINT", "https://api.agentops.ai") - - self._api_key: str = api_key - self._endpoint = endpoint - self._max_wait_time = max_wait_time or 5000 - self._max_queue_size = max_queue_size or 100 - self._parent_key: Optional[str] = parent_key - self._skip_auto_end_session: Optional[bool] = skip_auto_end_session - - @property - def api_key(self) -> str: - """ - Get the API Key for AgentOps services. - - Returns: - str: The API Key for AgentOps services. - """ - return self._api_key - - @api_key.setter - def api_key(self, value: str): - """ - Set the API Key for AgentOps services. - - Args: - value (str): The new API Key. - """ - self._api_key = value - - @property - def endpoint(self) -> str: - """ - Get the endpoint for the AgentOps service. - - Returns: - str: The endpoint for the AgentOps service. - """ - return self._endpoint # type: ignore - - @endpoint.setter - def endpoint(self, value: str): - """ - Set the endpoint for the AgentOps service. - - Args: - value (str): The new endpoint. - """ - self._endpoint = value - - @property - def max_wait_time(self) -> int: - """ - Get the maximum wait time for the AgentOps service. - - Returns: - int: The maximum wait time. - """ - return self._max_wait_time - - @max_wait_time.setter - def max_wait_time(self, value: int): - """ - Set the maximum wait time for the AgentOps service. - - Args: - value (int): The new maximum wait time. - """ - self._max_wait_time = value - - @property - def max_queue_size(self) -> int: - """ - Get the maximum size of the event queue. - - Returns: - int: The maximum size of the event queue. - """ - return self._max_queue_size - - @max_queue_size.setter - def max_queue_size(self, value: int): - """ - Set the maximum size of the event queue. - - Args: - value (int): The new maximum size of the event queue. - """ - self._max_queue_size = value - - @property - def parent_key(self): - return self._parent_key - - @property - def skip_auto_end_session(self): - return self._skip_auto_end_session - - @parent_key.setter - def parent_key(self, value: str): - self._parent_key = value + if api_key is not None: + try: + UUID(api_key) + self.api_key = api_key + except ValueError: + message = f"API Key is invalid: {{{api_key}}}.\n\t Find your API key at https://app.agentops.ai/settings/projects" + client.add_pre_init_warning(message) + logger.error(message) + + if parent_key is not None: + try: + UUID(parent_key) + self.parent_key = parent_key + except ValueError: + message = f"Parent Key is invalid: {parent_key}" + client.add_pre_init_warning(message) + logger.warning(message) + + if endpoint is not None: + self.endpoint = endpoint + + if max_wait_time is not None: + self.max_wait_time = max_wait_time + + if max_queue_size is not None: + self.max_queue_size = max_queue_size + + if default_tags is not None: + self.default_tags.update(default_tags) + + if instrument_llm_calls is not None: + self.instrument_llm_calls = instrument_llm_calls + + if auto_start_session is not None: + self.auto_start_session = auto_start_session + + if skip_auto_end_session is not None: + self.skip_auto_end_session = skip_auto_end_session + + if env_data_opt_out is not None: + self.env_data_opt_out = env_data_opt_out diff --git a/agentops/decorators.py b/agentops/decorators.py index b0517f5e5..0ea961e84 100644 --- a/agentops/decorators.py +++ b/agentops/decorators.py @@ -1,9 +1,13 @@ -import agentops -from .session import Session -from .client import Client import inspect import functools -from typing import Optional +from typing import Optional, Union +from uuid import uuid4 + +from .event import ActionEvent, ErrorEvent +from .helpers import check_call_stack_for_agent_id, get_ISO_time +from .session import Session +from .client import Client +from .log_config import logger def record_function(event_name: str): @@ -22,19 +26,177 @@ def decorator(func): @functools.wraps(func) async def async_wrapper(*args, session: Optional[Session] = None, **kwargs): - return await Client()._record_event_async( - func, event_name, *args, session=session, **kwargs + init_time = get_ISO_time() + if "session" in kwargs.keys(): + del kwargs["session"] + if session is None: + if Client().is_multi_session: + raise ValueError( + "If multiple sessions exists, `session` is a required parameter in the function decorated by @record_function" + ) + func_args = inspect.signature(func).parameters + arg_names = list(func_args.keys()) + # Get default values + arg_values = { + name: func_args[name].default + for name in arg_names + if func_args[name].default is not inspect._empty + } + # Update with positional arguments + arg_values.update(dict(zip(arg_names, args))) + arg_values.update(kwargs) + + event = ActionEvent( + params=arg_values, + init_timestamp=init_time, + agent_id=check_call_stack_for_agent_id(), + action_type=event_name, ) + try: + returns = await func(*args, **kwargs) + + # If the function returns multiple values, record them all in the same event + if isinstance(returns, tuple): + returns = list(returns) + + event.returns = returns + + # NOTE: Will likely remove in future since this is tightly coupled. Adding it to see how useful we find it for now + # TODO: check if screenshot is the url string we expect it to be? And not e.g. "True" + if hasattr(returns, "screenshot"): + event.screenshot = returns.screenshot # type: ignore + + event.end_timestamp = get_ISO_time() + + if session: + session.record(event) + else: + Client().record(event) + + except Exception as e: + Client().record(ErrorEvent(trigger_event=event, exception=e)) + + # Re-raise the exception + raise + + return returns + return async_wrapper else: @functools.wraps(func) def sync_wrapper(*args, session: Optional[Session] = None, **kwargs): - return Client()._record_event_sync( - func, event_name, *args, session=session, **kwargs + init_time = get_ISO_time() + if "session" in kwargs.keys(): + del kwargs["session"] + if session is None: + if Client().is_multi_session: + raise ValueError( + "If multiple sessions exists, `session` is a required parameter in the function decorated by @record_function" + ) + func_args = inspect.signature(func).parameters + arg_names = list(func_args.keys()) + # Get default values + arg_values = { + name: func_args[name].default + for name in arg_names + if func_args[name].default is not inspect._empty + } + # Update with positional arguments + arg_values.update(dict(zip(arg_names, args))) + arg_values.update(kwargs) + + event = ActionEvent( + params=arg_values, + init_timestamp=init_time, + agent_id=check_call_stack_for_agent_id(), + action_type=event_name, ) + try: + returns = func(*args, **kwargs) + + # If the function returns multiple values, record them all in the same event + if isinstance(returns, tuple): + returns = list(returns) + + event.returns = returns + + if hasattr(returns, "screenshot"): + event.screenshot = returns.screenshot # type: ignore + + event.end_timestamp = get_ISO_time() + + if session: + session.record(event) + else: + Client().record(event) + + except Exception as e: + Client().record(ErrorEvent(trigger_event=event, exception=e)) + + # Re-raise the exception + raise + + return returns + return sync_wrapper return decorator + + +def track_agent(name: Union[str, None] = None): + def decorator(obj): + if name: + obj.agent_ops_agent_name = name + + if inspect.isclass(obj): + original_init = obj.__init__ + + def new_init(self, *args, **kwargs): + try: + original_init(self, *args, **kwargs) + + if not Client().is_initialized: + Client().add_pre_init_warning( + f"Failed to track an agent {name} because agentops.init() was not " + + "called before initializing the agent with the @track_agent decorator." + ) + + self.agent_ops_agent_id = str(uuid4()) + + session = kwargs.get("session", None) + if session is not None: + self.agent_ops_session_id = session.session_id + + Client().create_agent( + name=self.agent_ops_agent_name, + agent_id=self.agent_ops_agent_id, + session=session, + ) + except AttributeError as e: + Client().add_pre_init_warning( + f"Failed to track an agent {name} because agentops.init() was not " + + "called before initializing the agent with the @track_agent decorator." + ) + logger.warning( + "Failed to track an agent. This often happens if agentops.init() was not " + "called before initializing an agent with the @track_agent decorator." + ) + original_init(self, *args, **kwargs) + + obj.__init__ = new_init + + elif inspect.isfunction(obj): + obj.agent_ops_agent_id = str(uuid4()) # type: ignore + Client().create_agent( + name=obj.agent_ops_agent_name, agent_id=obj.agent_ops_agent_id # type: ignore + ) + + else: + raise Exception("Invalid input, 'obj' must be a class or a function") + + return obj + + return decorator diff --git a/agentops/exceptions.py b/agentops/exceptions.py index 6cd3ce789..9a6d0b76e 100644 --- a/agentops/exceptions.py +++ b/agentops/exceptions.py @@ -11,9 +11,6 @@ def __init__(self, message): super().__init__(message) -class ConfigurationError(Exception): - """Exception raised for errors related to Configuration""" - - def __init__(self, message: str): +class ApiServerException(Exception): + def __init__(self, message): super().__init__(message) - logger.warning(message) diff --git a/agentops/helpers.py b/agentops/helpers.py index 5c3dc437f..22eece626 100644 --- a/agentops/helpers.py +++ b/agentops/helpers.py @@ -1,23 +1,14 @@ from pprint import pformat from functools import wraps -import time -from datetime import datetime +from datetime import datetime, timezone import json import inspect from typing import Union from .log_config import logger from uuid import UUID -import os from importlib.metadata import version - -PARTNER_FRAMEWORKS = { - # framework : instrument_llm_calls, auto_start_session - "autogen": (False, True), - "crewai": (False, True), -} - ao_instances = {} @@ -52,14 +43,12 @@ def clear_singletons(): def get_ISO_time(): """ - Get the current UTC time in ISO 8601 format with milliseconds precision, suffixed with 'Z' to denote UTC timezone. + Get the current UTC time in ISO 8601 format with milliseconds precision in UTC timezone. Returns: str: The current UTC time as a string in ISO 8601 format. """ - return ( - datetime.utcfromtimestamp(time.time()).isoformat(timespec="milliseconds") + "Z" - ) + return datetime.now(timezone.utc).isoformat() def is_jsonable(x): @@ -197,7 +186,3 @@ def wrapper(self, *args, **kwargs): return func(self, *args, **kwargs) return wrapper - - -def get_partner_frameworks(): - return PARTNER_FRAMEWORKS diff --git a/agentops/http_client.py b/agentops/http_client.py index cbeb8f4c7..a625cd192 100644 --- a/agentops/http_client.py +++ b/agentops/http_client.py @@ -1,8 +1,9 @@ from enum import Enum from typing import Optional -from .log_config import logger -import requests from requests.adapters import Retry, HTTPAdapter +import requests + +from .exceptions import ApiServerException JSON_HEADER = {"Content-Type": "application/json; charset=UTF-8", "Accept": "*/*"} @@ -89,7 +90,9 @@ def post( except requests.exceptions.Timeout: result.code = 408 result.status = HttpStatus.TIMEOUT - logger.warning("Could not post data - connection timed out") + raise ApiServerException( + "Could not reach API server - connection timed out" + ) except requests.exceptions.HTTPError as e: try: result.parse(e.response) @@ -98,16 +101,21 @@ def post( result.code = e.response.status_code result.status = Response.get_status(e.response.status_code) result.body = {"error": str(e)} + raise ApiServerException(f"HTTPError: {e}") except requests.exceptions.RequestException as e: result.body = {"error": str(e)} + raise ApiServerException(f"RequestException: {e}") if result.code == 401: - logger.warning( - "Could not post data - API server rejected your API key: %s", api_key + raise ApiServerException( + f"API server: invalid API key: {api_key}. Find your API key at https://app.agentops.ai/settings/projects" ) if result.code == 400: - logger.warning("Could not post data - %s", result.body) + if "message" in result.body: + raise ApiServerException(f"API server: {result.body['message']}") + else: + raise ApiServerException(f"API server: {result.body}") if result.code == 500: - logger.warning("Could not post data - internal server error") + raise ApiServerException("API server: - internal server error") return result diff --git a/agentops/langchain_callback_handler.py b/agentops/langchain_callback_handler.py deleted file mode 100644 index 4e3e51b9c..000000000 --- a/agentops/langchain_callback_handler.py +++ /dev/null @@ -1,727 +0,0 @@ -from typing import Dict, Any, List, Optional, Sequence, Union -from collections import defaultdict -from uuid import UUID - -from langchain_core.agents import AgentFinish, AgentAction -from langchain_core.documents import Document -from langchain_core.outputs import ChatGenerationChunk, GenerationChunk, LLMResult -from langchain.callbacks.base import BaseCallbackHandler, AsyncCallbackHandler -from langchain_core.messages import BaseMessage - -from tenacity import RetryCallState - -from agentops import Client as AOClient -from agentops import ActionEvent, LLMEvent, ToolEvent, ErrorEvent -from agentops.helpers import get_ISO_time -import logging - -from .helpers import debug_print_function_params - - -def get_model_from_kwargs(kwargs: any) -> str: - if "model" in kwargs["invocation_params"]: - return kwargs["invocation_params"]["model"] - elif "_type" in kwargs["invocation_params"]: - return kwargs["invocation_params"]["_type"] - else: - return "unknown_model" - - -# def get_completion_from_response(response: LLMResult): -# if 'text' in response.generations[0][0]: -# return response.generations[0][0].text -# if '' -# - - -class Events: - llm: Dict[str, LLMEvent] = {} - tool: Dict[str, ToolEvent] = {} - chain: Dict[str, ActionEvent] = {} - retriever: Dict[str, ActionEvent] = {} - error: Dict[str, ErrorEvent] = {} - - -class LangchainCallbackHandler(BaseCallbackHandler): - """Callback handler for Langchain agents.""" - - def __init__( - self, - api_key: Optional[str] = None, - endpoint: Optional[str] = None, - max_wait_time: Optional[int] = None, - max_queue_size: Optional[int] = None, - tags: Optional[List[str]] = None, - ): - - logging.warning( - "🚨Importing the Langchain Callback Handler from here is deprecated. Please import with " - "`from agentops.partners import LangchainCallbackHandler`" - ) - - client_params: Dict[str, Any] = { - "api_key": api_key, - "endpoint": endpoint, - "max_wait_time": max_wait_time, - "max_queue_size": max_queue_size, - "tags": tags, - } - - self.ao_client = AOClient( - **{k: v for k, v in client_params.items() if v is not None}, override=False - ) - self.agent_actions: Dict[UUID, List[ActionEvent]] = defaultdict(list) - self.events = Events() - - @debug_print_function_params - def on_llm_start( - self, - serialized: Dict[str, Any], - prompts: List[str], - *, - run_id: UUID, - parent_run_id: Optional[UUID] = None, - tags: Optional[List[str]] = None, - metadata: Optional[Dict[str, Any]] = None, - **kwargs: Any, - ) -> Any: - self.events.llm[str(run_id)] = LLMEvent( - params={ - **serialized, - **({} if metadata is None else metadata), - **kwargs, - }, # TODO: params is inconsistent, in ToolEvent we put it in logs - model=get_model_from_kwargs(kwargs), - prompt=prompts[0], - # tags=tags # TODO - ) - - @debug_print_function_params - def on_llm_error( - self, - error: BaseException, - *, - run_id: UUID, - parent_run_id: Optional[UUID] = None, - **kwargs: Any, - ) -> Any: - llm_event: LLMEvent = self.events.llm[str(run_id)] - self.ao_client.record(llm_event) - - error_event = ErrorEvent(trigger_event=llm_event, exception=error) - self.ao_client.record(error_event) - - @debug_print_function_params - def on_llm_end( - self, - response: LLMResult, - *, - run_id: UUID, - parent_run_id: Optional[UUID] = None, - **kwargs: Any, - ) -> Any: - llm_event: LLMEvent = self.events.llm[str(run_id)] - llm_event.returns = { - "content": response.generations[0][0].text, - "generations": response.generations, - } - llm_event.end_timestamp = get_ISO_time() - llm_event.completion = response.generations[0][0].text - if response.llm_output is not None: - llm_event.prompt_tokens = response.llm_output["token_usage"][ - "prompt_tokens" - ] - llm_event.completion_tokens = response.llm_output["token_usage"][ - "completion_tokens" - ] - - self.ao_client.record(llm_event) - - if len(response.generations) == 0: - # TODO: more descriptive error - error_event = ErrorEvent( - trigger_event=self.events.llm[str(run_id)], - error_type="NoGenerations", - details="on_llm_end: No generations", - ) - self.ao_client.record(error_event) - - @debug_print_function_params - def on_chain_start( - self, - serialized: Dict[str, Any], - inputs: Dict[str, Any], - *, - run_id: UUID, - parent_run_id: Optional[UUID] = None, - tags: Optional[List[str]] = None, - metadata: Optional[Dict[str, Any]] = None, - **kwargs: Any, - ) -> Any: - self.events.chain[str(run_id)] = ActionEvent( - params={ - **serialized, - **inputs, - **({} if metadata is None else metadata), - **kwargs, - }, - action_type="chain", - ) - - @debug_print_function_params - def on_chain_end( - self, - outputs: Dict[str, Any], - *, - run_id: UUID, - parent_run_id: Optional[UUID] = None, - **kwargs: Any, - ) -> Any: - action_event: ActionEvent = self.events.chain[str(run_id)] - action_event.returns = outputs - action_event.end_timestamp = get_ISO_time() - self.ao_client.record(action_event) - - @debug_print_function_params - def on_chain_error( - self, - error: BaseException, - *, - run_id: UUID, - parent_run_id: Optional[UUID] = None, - **kwargs: Any, - ) -> Any: - action_event: ActionEvent = self.events.chain[str(run_id)] - self.ao_client.record(action_event) - - error_event = ErrorEvent(trigger_event=action_event, exception=error) - self.ao_client.record(error_event) - - @debug_print_function_params - def on_tool_start( - self, - serialized: Dict[str, Any], - input_str: str, - *, - run_id: UUID, - parent_run_id: Optional[UUID] = None, - tags: Optional[List[str]] = None, - metadata: Optional[Dict[str, Any]] = None, - inputs: Optional[Dict[str, Any]] = None, - **kwargs: Any, - ) -> Any: - self.events.tool[str(run_id)] = ToolEvent( - params=input_str if inputs is None else inputs, - name=serialized["name"], - logs={ - **serialized, - "tags": tags, - **({} if metadata is None else metadata), - **({} if inputs is None else inputs), - **kwargs, - }, - ) - - @debug_print_function_params - def on_tool_end( - self, - output: str, - *, - run_id: UUID, - parent_run_id: Optional[UUID] = None, - **kwargs: Any, - ) -> Any: - tool_event: ToolEvent = self.events.tool[str(run_id)] - tool_event.end_timestamp = get_ISO_time() - tool_event.returns = output - self.ao_client.record(tool_event) - - # Tools are capable of failing `on_tool_end` quietly. - # This is a workaround to make sure we can log it as an error. - if kwargs.get("name") == "_Exception": - error_event = ErrorEvent( - trigger_event=tool_event, - error_type="LangchainToolException", - details=output, - ) - self.ao_client.record(error_event) - - @debug_print_function_params - def on_tool_error( - self, - error: BaseException, - *, - run_id: UUID, - parent_run_id: Optional[UUID] = None, - **kwargs: Any, - ) -> Any: - tool_event: ToolEvent = self.events.tool[str(run_id)] - self.ao_client.record(tool_event) - - error_event = ErrorEvent(trigger_event=tool_event, exception=error) - self.ao_client.record(error_event) - - @debug_print_function_params - def on_retriever_start( - self, - serialized: Dict[str, Any], - query: str, - *, - run_id: UUID, - parent_run_id: Optional[UUID] = None, - tags: Optional[List[str]] = None, - metadata: Optional[Dict[str, Any]] = None, - **kwargs: Any, - ) -> None: - self.events.retriever[str(run_id)] = ActionEvent( - params={ - **serialized, - "query": query, - **({} if metadata is None else metadata), - **kwargs, - }, - action_type="retriever", - ) - - @debug_print_function_params - def on_retriever_end( - self, - documents: Sequence[Document], - *, - run_id: UUID, - parent_run_id: Optional[UUID] = None, - tags: Optional[List[str]] = None, - **kwargs: Any, - ) -> None: - action_event: ActionEvent = self.events.retriever[str(run_id)] - # TODO: Adding this. Might want to add elsewhere e.g. params - action_event.logs = documents - action_event.end_timestamp = get_ISO_time() - self.ao_client.record(action_event) - - @debug_print_function_params - def on_retriever_error( - self, - error: BaseException, - *, - run_id: UUID, - parent_run_id: Optional[UUID] = None, - tags: Optional[List[str]] = None, - **kwargs: Any, - ) -> None: - action_event: ActionEvent = self.events.retriever[str(run_id)] - self.ao_client.record(action_event) - - error_event = ErrorEvent(trigger_event=action_event, exception=error) - self.ao_client.record(error_event) - - @debug_print_function_params - def on_agent_action( - self, - action: AgentAction, - *, - run_id: UUID, - parent_run_id: Optional[UUID] = None, - **kwargs: Any, - ) -> Any: - self.agent_actions[run_id].append( - ActionEvent(params={"action": action, **kwargs}, action_type="agent") - ) - - @debug_print_function_params - def on_agent_finish( - self, - finish: AgentFinish, - *, - run_id: UUID, - parent_run_id: Optional[UUID] = None, - **kwargs: Any, - ) -> Any: - # Need to attach finish to some on_agent_action so just choosing the last one - self.agent_actions[run_id][-1].returns = finish.to_json() - - for agentAction in self.agent_actions[run_id]: - self.ao_client.record(agentAction) - - # TODO: Create a way for the end user to set this based on their conditions - # self.ao_client.end_session("Success") #TODO: calling end_session here causes "No current session" - - @debug_print_function_params - def on_retry( - self, - retry_state: RetryCallState, - *, - run_id: UUID, - parent_run_id: Optional[UUID] = None, - **kwargs: Any, - ) -> Any: - action_event = ActionEvent( - params={**kwargs}, - returns=str(retry_state), - action_type="retry", - # result="Indeterminate" # TODO: currently have no way of recording Indeterminate - ) - self.ao_client.record(action_event) - - @property - def session_id(self): - return self.ao_client.current_session_id - - -class AsyncLangchainCallbackHandler(AsyncCallbackHandler): - """Callback handler for Langchain agents.""" - - def __init__( - self, - api_key: Optional[str] = None, - endpoint: Optional[str] = None, - max_wait_time: Optional[int] = None, - max_queue_size: Optional[int] = None, - tags: Optional[List[str]] = None, - ): - - client_params: Dict[str, Any] = { - "api_key": api_key, - "endpoint": endpoint, - "max_wait_time": max_wait_time, - "max_queue_size": max_queue_size, - "tags": tags, - } - - self.ao_client = AOClient( - **{k: v for k, v in client_params.items() if v is not None}, override=False - ) - - self.events = Events() - self.agent_actions: Dict[UUID, List[ActionEvent]] = defaultdict(list) - - @debug_print_function_params - async def on_llm_start( - self, - serialized: Dict[str, Any], - prompts: List[str], - *, - run_id: UUID, - parent_run_id: Optional[UUID] = None, - tags: Optional[List[str]] = None, - metadata: Optional[Dict[str, Any]] = None, - **kwargs: Any, - ) -> Any: - self.events.llm[str(run_id)] = LLMEvent( - params={ - **serialized, - **({} if metadata is None else metadata), - **kwargs, - }, # TODO: params is inconsistent, in ToolEvent we put it in logs - model=kwargs["invocation_params"]["model"], - prompt=prompts[0], - ) - - @debug_print_function_params - async def on_chat_model_start( - self, - serialized: Dict[str, Any], - messages: List[List[BaseMessage]], - *, - run_id: UUID, - parent_run_id: Optional[UUID] = None, - tags: Optional[List[str]] = None, - metadata: Optional[Dict[str, Any]] = None, - **kwargs: Any, - ) -> Any: - pass - - @debug_print_function_params - async def on_llm_new_token( - self, - token: str, - *, - chunk: Optional[Union[GenerationChunk, ChatGenerationChunk]] = None, - run_id: UUID, - parent_run_id: Optional[UUID] = None, - tags: Optional[List[str]] = None, - **kwargs: Any, - ) -> None: - pass - - @debug_print_function_params - async def on_llm_error( - self, - error: BaseException, - *, - run_id: UUID, - parent_run_id: Optional[UUID] = None, - **kwargs: Any, - ) -> Any: - llm_event: LLMEvent = self.events.llm[str(run_id)] - self.ao_client.record(llm_event) - - error_event = ErrorEvent(trigger_event=llm_event, exception=error) - self.ao_client.record(error_event) - - @debug_print_function_params - async def on_llm_end( - self, - response: LLMResult, - *, - run_id: UUID, - parent_run_id: Optional[UUID] = None, - **kwargs: Any, - ) -> Any: - llm_event: LLMEvent = self.events.llm[str(run_id)] - llm_event.returns = { - "content": response.generations[0][0].text, - "generations": response.generations, - } - llm_event.end_timestamp = get_ISO_time() - llm_event.completion = response.generations[0][0].text - if response.llm_output is not None: - llm_event.prompt_tokens = response.llm_output["token_usage"][ - "prompt_tokens" - ] - llm_event.completion_tokens = response.llm_output["token_usage"][ - "completion_tokens" - ] - self.ao_client.record(llm_event) - - if len(response.generations) == 0: - # TODO: more descriptive error - error_event = ErrorEvent( - trigger_event=self.events.llm[str(run_id)], - error_type="NoGenerations", - details="on_llm_end: No generations", - ) - self.ao_client.record(error_event) - - @debug_print_function_params - async def on_chain_start( - self, - serialized: Dict[str, Any], - inputs: Dict[str, Any], - *, - run_id: UUID, - parent_run_id: Optional[UUID] = None, - tags: Optional[List[str]] = None, - metadata: Optional[Dict[str, Any]] = None, - **kwargs: Any, - ) -> Any: - self.events.chain[str(run_id)] = ActionEvent( - params={ - **serialized, - **inputs, - **({} if metadata is None else metadata), - **kwargs, - }, - action_type="chain", - ) - - @debug_print_function_params - async def on_chain_end( - self, - outputs: Dict[str, Any], - *, - run_id: UUID, - parent_run_id: Optional[UUID] = None, - **kwargs: Any, - ) -> Any: - action_event: ActionEvent = self.events.chain[str(run_id)] - action_event.returns = outputs - action_event.end_timestamp = get_ISO_time() - self.ao_client.record(action_event) - - @debug_print_function_params - async def on_chain_error( - self, - error: BaseException, - *, - run_id: UUID, - parent_run_id: Optional[UUID] = None, - **kwargs: Any, - ) -> Any: - action_event: ActionEvent = self.events.chain[str(run_id)] - self.ao_client.record(action_event) - - error_event = ErrorEvent(trigger_event=action_event, exception=error) - self.ao_client.record(error_event) - - @debug_print_function_params - async def on_tool_start( - self, - serialized: Dict[str, Any], - input_str: str, - *, - run_id: UUID, - parent_run_id: Optional[UUID] = None, - tags: Optional[List[str]] = None, - metadata: Optional[Dict[str, Any]] = None, - inputs: Optional[Dict[str, Any]] = None, - **kwargs: Any, - ) -> Any: - self.events.tool[str(run_id)] = ToolEvent( - params=input_str if inputs is None else inputs, - name=serialized["name"], - logs={ - **serialized, - "tags": tags, - **({} if metadata is None else metadata), - **({} if inputs is None else inputs), - **kwargs, - }, - ) - - @debug_print_function_params - async def on_tool_end( - self, - output: str, - *, - run_id: UUID, - parent_run_id: Optional[UUID] = None, - **kwargs: Any, - ) -> Any: - tool_event: ToolEvent = self.events.tool[str(run_id)] - tool_event.end_timestamp = get_ISO_time() - tool_event.returns = output - self.ao_client.record(tool_event) - - # Tools are capable of failing `on_tool_end` quietly. - # This is a workaround to make sure we can log it as an error. - if kwargs.get("name") == "_Exception": - error_event = ErrorEvent( - trigger_event=tool_event, - error_type="LangchainToolException", - details=output, - ) - self.ao_client.record(error_event) - - @debug_print_function_params - async def on_tool_error( - self, - error: BaseException, - *, - run_id: UUID, - parent_run_id: Optional[UUID] = None, - **kwargs: Any, - ) -> Any: - tool_event: ToolEvent = self.events.tool[str(run_id)] - self.ao_client.record(tool_event) - - error_event = ErrorEvent(trigger_event=tool_event, exception=error) - self.ao_client.record(error_event) - - @debug_print_function_params - async def on_retriever_start( - self, - serialized: Dict[str, Any], - query: str, - *, - run_id: UUID, - parent_run_id: Optional[UUID] = None, - tags: Optional[List[str]] = None, - metadata: Optional[Dict[str, Any]] = None, - **kwargs: Any, - ) -> None: - self.events.retriever[str(run_id)] = ActionEvent( - params={ - **serialized, - "query": query, - **({} if metadata is None else metadata), - **kwargs, - }, - action_type="retriever", - ) - - @debug_print_function_params - async def on_retriever_end( - self, - documents: Sequence[Document], - *, - run_id: UUID, - parent_run_id: Optional[UUID] = None, - tags: Optional[List[str]] = None, - **kwargs: Any, - ) -> None: - action_event: ActionEvent = self.events.retriever[str(run_id)] - # TODO: Adding this. Might want to add elsewhere e.g. params - action_event.logs = documents - action_event.end_timestamp = get_ISO_time() - self.ao_client.record(action_event) - - @debug_print_function_params - async def on_retriever_error( - self, - error: BaseException, - *, - run_id: UUID, - parent_run_id: Optional[UUID] = None, - tags: Optional[List[str]] = None, - **kwargs: Any, - ) -> None: - action_event: ActionEvent = self.events.retriever[str(run_id)] - self.ao_client.record(action_event) - - error_event = ErrorEvent(trigger_event=action_event, exception=error) - self.ao_client.record(error_event) - - @debug_print_function_params - async def on_agent_action( - self, - action: AgentAction, - *, - run_id: UUID, - parent_run_id: Optional[UUID] = None, - **kwargs: Any, - ) -> Any: - self.agent_actions[run_id].append( - ActionEvent(params={"action": action, **kwargs}, action_type="agent") - ) - - @debug_print_function_params - async def on_agent_finish( - self, - finish: AgentFinish, - *, - run_id: UUID, - parent_run_id: Optional[UUID] = None, - **kwargs: Any, - ) -> Any: - # Need to attach finish to some on_agent_action so just choosing the last one - self.agent_actions[run_id][-1].returns = finish.to_json() - - for agentAction in self.agent_actions[run_id]: - self.ao_client.record(agentAction) - - # TODO: Create a way for the end user to set this based on their conditions - # self.ao_client.end_session("Success") #TODO: calling end_session here causes "No current session" - - @debug_print_function_params - async def on_text( - self, - text: str, - *, - run_id: UUID, - parent_run_id: Optional[UUID] = None, - tags: Optional[List[str]] = None, - **kwargs: Any, - ) -> None: - pass - - @debug_print_function_params - async def on_retry( - self, - retry_state: RetryCallState, - *, - run_id: UUID, - parent_run_id: Optional[UUID] = None, - **kwargs: Any, - ) -> Any: - action_event = ActionEvent( - params={**kwargs}, - returns=str(retry_state), - action_type="retry", - # result="Indeterminate" # TODO: currently have no way of recording Indeterminate - ) - self.ao_client.record(action_event) - - @property - async def session_id(self): - return self.ao_client.current_session_id diff --git a/agentops/log_config.py b/agentops/log_config.py index ea04ddc48..f814fb52b 100644 --- a/agentops/log_config.py +++ b/agentops/log_config.py @@ -1,31 +1,53 @@ import logging -from sys import prefix +import os +import re -class AgentOpsFormatter(logging.Formatter): +class AgentOpsLogFormatter(logging.Formatter): blue = "\x1b[34m" bold_red = "\x1b[31;1m" reset = "\x1b[0m" - prefix = "🖇 AgentOps: %(message)s" + prefix = "🖇 AgentOps: " FORMATS = { - logging.DEBUG: f"(DEBUG) {prefix}", - logging.INFO: f"{prefix}", - logging.WARNING: f"{prefix}", - logging.ERROR: f"{bold_red}{prefix}{reset}", - logging.CRITICAL: f"{bold_red}{prefix}{reset}", + logging.DEBUG: f"(DEBUG) {prefix}%(message)s", + logging.INFO: f"{prefix}%(message)s", + logging.WARNING: f"{prefix}%(message)s", + logging.ERROR: f"{bold_red}{prefix}%(message)s{reset}", + logging.CRITICAL: f"{bold_red}{prefix}%(message)s{reset}", } def format(self, record): - log_fmt = self.FORMATS.get(record.levelno) + log_fmt = self.FORMATS.get(record.levelno, self.FORMATS[logging.INFO]) formatter = logging.Formatter(log_fmt) return formatter.format(record) logger = logging.getLogger("agentops") +logger.propagate = False logger.setLevel(logging.CRITICAL) -handler = logging.StreamHandler() -handler.setLevel(logging.DEBUG) -handler.setFormatter(AgentOpsFormatter()) -logger.addHandler(handler) +# Streaming Handler +stream_handler = logging.StreamHandler() +stream_handler.setLevel(logging.DEBUG) +stream_handler.setFormatter(AgentOpsLogFormatter()) +logger.addHandler(stream_handler) + + +# File Handler +class AgentOpsLogFileFormatter(logging.Formatter): + def format(self, record): + # Remove ANSI escape codes from the message + record.msg = ANSI_ESCAPE_PATTERN.sub("", record.msg) + return super().format(record) + + +ANSI_ESCAPE_PATTERN = re.compile(r"\x1b\[[0-9;]*m") +log_to_file = os.environ.get("AGENTOPS_LOGGING_TO_FILE", "True").lower() == "true" +if log_to_file: + file_handler = logging.FileHandler("agentops.log", mode="w") + file_handler.setLevel(logging.DEBUG) + formatter = AgentOpsLogFileFormatter("%(asctime)s - %(levelname)s - %(message)s") + file_handler.setFormatter(formatter) + file_handler.setFormatter(formatter) + logger.addHandler(file_handler) diff --git a/agentops/meta_client.py b/agentops/meta_client.py index 51f1630d0..6e2227712 100644 --- a/agentops/meta_client.py +++ b/agentops/meta_client.py @@ -37,12 +37,14 @@ def send_exception_to_server(cls, exception, api_key, session): if session: developer_error["session_id"] = session.session_id - - HttpClient.post( - "https://api.agentops.ai/v2/developer_errors", - safe_serialize(developer_error).encode("utf-8"), - api_key=api_key, - ) + try: + HttpClient.post( + "https://api.agentops.ai/v2/developer_errors", + safe_serialize(developer_error).encode("utf-8"), + api_key=api_key, + ) + except: + pass def handle_exceptions(method): diff --git a/agentops/session.py b/agentops/session.py index d9edbe2f9..5abb96293 100644 --- a/agentops/session.py +++ b/agentops/session.py @@ -3,14 +3,17 @@ import json import threading import time +from decimal import ROUND_HALF_UP, Decimal +from termcolor import colored +from typing import Optional, List, Union +from uuid import UUID, uuid4 +from .exceptions import ApiServerException +from .enums import EndState from .event import ErrorEvent, Event from .log_config import logger -from .config import ClientConfiguration +from .config import Configuration from .helpers import get_ISO_time, filter_unjsonable, safe_serialize -from typing import Optional, List, Union -from uuid import UUID, uuid4 - from .http_client import HttpClient @@ -33,15 +36,15 @@ class Session: def __init__( self, session_id: UUID, + config: Configuration, tags: Optional[List[str]] = None, host_env: Optional[dict] = None, - config: Optional[ClientConfiguration] = None, ): self.end_timestamp = None self.end_state: Optional[str] = None self.session_id = session_id self.init_timestamp = get_ISO_time() - self.tags = tags + self.tags: List[str] = tags or [] self.video: Optional[str] = None self.end_state_reason: Optional[str] = None self.host_env = host_env @@ -55,7 +58,10 @@ def __init__( self.thread.daemon = True self.thread.start() - self._start_session() + self.is_running = self._start_session() + if self.is_running == False: + self.stop_flag.set() + self.thread.join(timeout=1) def set_video(self, video: str) -> None: """ @@ -67,11 +73,25 @@ def set_video(self, video: str) -> None: self.video = video def end_session( - self, end_state: str = "Indeterminate", end_state_reason: Optional[str] = None - ) -> str: + self, + end_state: str = "Indeterminate", + end_state_reason: Optional[str] = None, + video: Optional[str] = None, + ) -> Union[Decimal, None]: + + if not self.is_running: + return + + if not any(end_state == state.value for state in EndState): + return logger.warning( + "Invalid end_state. Please use one of the EndState enums" + ) + self.end_timestamp = get_ISO_time() self.end_state = end_state self.end_state_reason = end_state_reason + if video is not None: + self.video = video self.stop_flag.set() self.thread.join(timeout=1) @@ -79,15 +99,45 @@ def end_session( with self.lock: payload = {"session": self.__dict__} + try: + res = HttpClient.post( + f"{self.config.endpoint}/v2/update_session", + json.dumps(filter_unjsonable(payload)).encode("utf-8"), + jwt=self.jwt, + ) + except ApiServerException as e: + return logger.error(f"Could not end session - {e}") - res = HttpClient.post( - f"{self.config.endpoint}/v2/update_session", - json.dumps(filter_unjsonable(payload)).encode("utf-8"), - jwt=self.jwt, + logger.debug(res.body) + token_cost = res.body.get("token_cost", "unknown") + + if token_cost == "unknown" or token_cost is None: + logger.info("Could not determine cost of run.") + token_cost_d = Decimal(0) + else: + token_cost_d = Decimal(token_cost) + logger.info( + "This run's cost ${}".format( + "{:.2f}".format(token_cost_d) + if token_cost_d == 0 + else "{:.6f}".format( + token_cost_d.quantize( + Decimal("0.000001"), rounding=ROUND_HALF_UP + ) + ) + ) ) - logger.debug(res.body) - self.queue = [] - return res.body.get("token_cost", "unknown") + + logger.info( + colored( + f"\x1b[34mSession Replay: https://app.agentops.ai/drilldown?session_id={self.session_id}\x1b[0m", + "blue", + ) + ) + + active_sessions.remove(self) + + return token_cost_d def add_tags(self, tags: List[str]) -> None: """ @@ -96,11 +146,12 @@ def add_tags(self, tags: List[str]) -> None: Args: tags (List[str]): The list of tags to append. """ + if not self.is_running: + return - # if a string and not a list of strings if not (isinstance(tags, list) and all(isinstance(item, str) for item in tags)): - if isinstance(tags, str): # if it's a single string - tags = [tags] # make it a list + if isinstance(tags, str): + tags = [tags] if self.tags is None: self.tags = tags @@ -112,14 +163,19 @@ def add_tags(self, tags: List[str]) -> None: self._update_session() def set_tags(self, tags): + if not self.is_running: + return + if not (isinstance(tags, list) and all(isinstance(item, str) for item in tags)): - if isinstance(tags, str): # if it's a single string - tags = [tags] # make it a list + if isinstance(tags, str): + tags = [tags] self.tags = tags self._update_session() def record(self, event: Union[Event, ErrorEvent]): + if not self.is_running: + return if isinstance(event, Event): if not event.end_timestamp or event.init_timestamp == event.end_timestamp: event.end_timestamp = get_ISO_time() @@ -170,12 +226,16 @@ def _start_session(self): with self.lock: payload = {"session": self.__dict__} serialized_payload = json.dumps(filter_unjsonable(payload)).encode("utf-8") - res = HttpClient.post( - f"{self.config.endpoint}/v2/create_session", - serialized_payload, - self.config.api_key, - self.config.parent_key, - ) + + try: + res = HttpClient.post( + f"{self.config.endpoint}/v2/create_session", + serialized_payload, + self.config.api_key, + self.config.parent_key, + ) + except ApiServerException as e: + return logger.error(f"Could not start session - {e}") logger.debug(res.body) @@ -190,16 +250,23 @@ def _start_session(self): return True def _update_session(self) -> None: + if not self.is_running: + return with self.lock: payload = {"session": self.__dict__} - res = HttpClient.post( - f"{self.config.endpoint}/v2/update_session", - json.dumps(filter_unjsonable(payload)).encode("utf-8"), - jwt=self.jwt, - ) + try: + res = HttpClient.post( + f"{self.config.endpoint}/v2/update_session", + json.dumps(filter_unjsonable(payload)).encode("utf-8"), + jwt=self.jwt, + ) + except ApiServerException as e: + return logger.error(f"Could not update session - {e}") def _flush_queue(self) -> None: + if not self.is_running: + return with self.lock: queue_copy = copy.deepcopy(self.queue) # Copy the current items self.queue = [] @@ -210,11 +277,14 @@ def _flush_queue(self) -> None: } serialized_payload = safe_serialize(payload).encode("utf-8") - HttpClient.post( - f"{self.config.endpoint}/v2/create_events", - serialized_payload, - jwt=self.jwt, - ) + try: + HttpClient.post( + f"{self.config.endpoint}/v2/create_events", + serialized_payload, + jwt=self.jwt, + ) + except ApiServerException as e: + return logger.error(f"Could not post events - {e}") logger.debug("\n") logger.debug( @@ -230,6 +300,8 @@ def _run(self) -> None: self._flush_queue() def create_agent(self, name, agent_id): + if not self.is_running: + return if agent_id is None: agent_id = str(uuid4()) @@ -239,9 +311,14 @@ def create_agent(self, name, agent_id): } serialized_payload = safe_serialize(payload).encode("utf-8") - HttpClient.post( - f"{self.config.endpoint}/v2/create_agent", serialized_payload, jwt=self.jwt - ) + try: + HttpClient.post( + f"{self.config.endpoint}/v2/create_agent", + serialized_payload, + jwt=self.jwt, + ) + except ApiServerException as e: + return logger.error(f"Could not create agent - {e}") return agent_id @@ -252,3 +329,6 @@ def wrapper(*args, **kwargs): return func(*args, **kwargs) return wrapper + + +active_sessions: List[Session] = [] diff --git a/agentops/state.py b/agentops/state.py deleted file mode 100644 index d29b093ff..000000000 --- a/agentops/state.py +++ /dev/null @@ -1,9 +0,0 @@ -state_dict = {} - - -def set_state(key: str, value: any): - state_dict[key] = value - - -def get_state(key: str) -> any: - return state_dict.get(key) diff --git a/docs/v1/concepts/tags.mdx b/docs/v1/concepts/tags.mdx index 751f7ec48..ac30a336b 100644 --- a/docs/v1/concepts/tags.mdx +++ b/docs/v1/concepts/tags.mdx @@ -8,15 +8,18 @@ Ex: `paid-user`, `v3`, `recruiter-agent`' # How to use Tags -Tags belong to [Sessions](/v1/concepts/tags), but can be assigned in multiple ways. +There are two types: `tags` and `default tags`. + +Tags belong to [Sessions](/v1/concepts/sessions) and are a fantastic way to organize your Sessions. When a session is created, `default_tags` +are automatically assigned to the session alongside any tags that are passed in. ## On `init()` -Passing tags into the `init` function will assign these tags to the `Session` +Passing `default_tags` into the `init` function will assign these tags to the all subsequent Sessions. ```python python -agentops.init(tags=['test tag']) +agentops.init(default_tags=['test tag']) ``` @@ -32,7 +35,7 @@ agentops.set_tags(['test new tag']) ## `add_tags([str])` -This will append an array of tags to the Session's existing tags +This will append an array of tags to the session's existing tags. ```python python diff --git a/docs/v1/usage/environment-variables.mdx b/docs/v1/usage/environment-variables.mdx index 7389f9dc0..03d6ff98e 100644 --- a/docs/v1/usage/environment-variables.mdx +++ b/docs/v1/usage/environment-variables.mdx @@ -26,6 +26,8 @@ AGENTOPS_PARENT_KEY= AGENTOPS_API_ENDPOINT=https://api.agentops.ai # Logging level. . Defaults to INFO AGENTOPS_LOGGING_LEVEL=INFO +# Write logs to file . Defaults to TRUE +AGENTOPS_LOGGING_TO_FILE=TRUE # Whether to opt out of recording environment data. . Defaults to FALSE AGENTOPS_ENV_DATA_OPT_OUT=FALSE ``` diff --git a/pyproject.toml b/pyproject.toml index 56a171b0e..e186b4a65 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "agentops" -version = "0.3.4" +version = "0.3.5" authors = [ { name="Alex Reibman", email="areibman@gmail.com" }, { name="Shawn Qiu", email="siyangqiu@gmail.com" }, diff --git a/tach.yml b/tach.yml index b0b2cc243..08b7b475f 100644 --- a/tach.yml +++ b/tach.yml @@ -1,96 +1,91 @@ -# yaml-language-server: $schema=https://raw.githubusercontent.com/gauge-sh/tach/v0.8.2/public/tach-yml-schema.json +# yaml-language-server: $schema=https://raw.githubusercontent.com/gauge-sh/tach/v0.9.0/public/tach-yml-schema.json modules: - path: agentops depends_on: - - agentops.agent - - agentops.client - - agentops.config - - agentops.decorators - - agentops.event - - agentops.log_config - - agentops.partners - - agentops.session + - path: agentops.client + - path: agentops.decorators + - path: agentops.event + - path: agentops.log_config + - path: agentops.partners + - path: agentops.session - path: agentops.agent - depends_on: - - agentops - - agentops.log_config + depends_on: [] - path: agentops.client depends_on: - - agentops - - agentops.config - - agentops.enums - - agentops.event - - agentops.exceptions - - agentops.helpers - - agentops.host_env - - agentops.llm_tracker - - agentops.log_config - - agentops.meta_client - - agentops.partners - - agentops.session + - path: agentops.config + - path: agentops.event + - path: agentops.helpers + - path: agentops.host_env + - path: agentops.llm_tracker + - path: agentops.log_config + - path: agentops.meta_client + - path: agentops.partners + - path: agentops.session - path: agentops.config depends_on: - - agentops.exceptions + - path: agentops.log_config - path: agentops.decorators depends_on: - - agentops - - agentops.client - - agentops.session + - path: agentops.client + - path: agentops.event + - path: agentops.helpers + - path: agentops.log_config + - path: agentops.session - path: agentops.enums depends_on: [] - path: agentops.event depends_on: - - agentops.enums - - agentops.helpers + - path: agentops.enums + - path: agentops.helpers - path: agentops.exceptions depends_on: - - agentops.log_config + - path: agentops.log_config - path: agentops.helpers depends_on: - - agentops.log_config + - path: agentops.log_config - path: agentops.host_env depends_on: - - agentops.helpers - - agentops.log_config + - path: agentops.helpers + - path: agentops.log_config - path: agentops.http_client depends_on: - - agentops.log_config + - path: agentops.exceptions - path: agentops.langchain_callback_handler - depends_on: - - agentops - - agentops.helpers + depends_on: [] - path: agentops.llm_tracker depends_on: - - agentops.event - - agentops.helpers - - agentops.log_config - - agentops.session + - path: agentops.event + - path: agentops.helpers + - path: agentops.log_config + - path: agentops.session - path: agentops.log_config depends_on: [] - path: agentops.meta_client depends_on: - - agentops.helpers - - agentops.host_env - - agentops.http_client - - agentops.log_config + - path: agentops.helpers + - path: agentops.host_env + - path: agentops.http_client + - path: agentops.log_config - path: agentops.partners depends_on: - - agentops - - agentops.enums - - agentops.helpers - - agentops.log_config + - path: agentops + - path: agentops.enums + - path: agentops.helpers + - path: agentops.log_config - path: agentops.session depends_on: - - agentops.config - - agentops.event - - agentops.helpers - - agentops.http_client - - agentops.log_config + - path: agentops.client + - path: agentops.config + - path: agentops.enums + - path: agentops.event + - path: agentops.exceptions + - path: agentops.helpers + - path: agentops.http_client + - path: agentops.log_config - path: depends_on: - - agentops - - agentops.enums - - agentops.helpers + - path: agentops + - path: agentops.helpers exclude: - .*__pycache__ - .*egg-info @@ -98,4 +93,5 @@ exclude: - examples - tests - venv -source_root: . +source_roots: + - . diff --git a/tests/test_canary.py b/tests/test_canary.py index 57ff58c3b..a89cf3447 100644 --- a/tests/test_canary.py +++ b/tests/test_canary.py @@ -1,5 +1,4 @@ import pytest -import requests import requests_mock import time import agentops @@ -30,7 +29,7 @@ def mock_req(): class TestCanary: def setup_method(self): self.url = "https://api.agentops.ai" - self.api_key = "random_api_key" + self.api_key = "11111111-1111-4111-8111-111111111111" agentops.init(api_key=self.api_key, max_wait_time=5, auto_start_session=False) def test_agent_ops_record(self, mock_req): diff --git a/tests/test_events.py b/tests/test_events.py index 6f41f5809..5234f3fad 100644 --- a/tests/test_events.py +++ b/tests/test_events.py @@ -28,11 +28,8 @@ def mock_req(): class TestEvents: def setup_method(self): - self.api_key = "random_api_key" + self.api_key = "11111111-1111-4111-8111-111111111111" self.event_type = "test_event_type" - self.config = agentops.ClientConfiguration( - api_key=self.api_key, max_wait_time=50, max_queue_size=1 - ) def test_record_timestamp(self, mock_req): agentops.init(api_key=self.api_key) diff --git a/tests/test_record_function.py b/tests/test_record_function.py index 24ebeb604..2fc8cd299 100644 --- a/tests/test_record_function.py +++ b/tests/test_record_function.py @@ -42,7 +42,7 @@ def create_session_response(request, context): class TestRecordAction: def setup_method(self): self.url = "https://api.agentops.ai" - self.api_key = "random_api_key" + self.api_key = "11111111-1111-4111-8111-111111111111" self.event_type = "test_event_type" agentops.init(self.api_key, max_wait_time=5, auto_start_session=False) @@ -114,12 +114,8 @@ async def async_add(x, y): assert request_json["events"][0]["params"] == {"x": 3, "y": 4} assert request_json["events"][0]["returns"] == 7 - init = datetime.fromisoformat( - request_json["events"][0]["init_timestamp"].replace("Z", "+00:00") - ) - end = datetime.fromisoformat( - request_json["events"][0]["end_timestamp"].replace("Z", "+00:00") - ) + init = datetime.fromisoformat(request_json["events"][0]["init_timestamp"]) + end = datetime.fromisoformat(request_json["events"][0]["end_timestamp"]) assert (end - init).total_seconds() >= 0.1 @@ -128,6 +124,8 @@ async def async_add(x, y): def test_multiple_sessions_sync(self, mock_req): session_1 = agentops.start_session() session_2 = agentops.start_session() + assert session_1 is not None + assert session_2 is not None # Arrange @record_function(event_name=self.event_type) @@ -172,6 +170,8 @@ def add_three(x, y, z=3): async def test_multiple_sessions_async(self, mock_req): session_1 = agentops.start_session() session_2 = agentops.start_session() + assert session_1 is not None + assert session_2 is not None # Arrange @record_function(self.event_type) diff --git a/tests/test_session.py b/tests/test_session.py index 3ab52f537..f54620857 100644 --- a/tests/test_session.py +++ b/tests/test_session.py @@ -30,16 +30,25 @@ def mock_req(): yield m +class TestNonInitializedSessions: + def setup_method(self): + self.api_key = "11111111-1111-4111-8111-111111111111" + self.event_type = "test_event_type" + + def test_non_initialized_doesnt_start_session(self, mock_req): + agentops.set_api_key(self.api_key) + session = agentops.start_session() + assert session is None + + class TestSingleSessions: def setup_method(self): - self.api_key = "random_api_key" + self.api_key = "11111111-1111-4111-8111-111111111111" self.event_type = "test_event_type" - self.config = agentops.ClientConfiguration( - api_key=self.api_key, max_wait_time=50 - ) + agentops.init(api_key=self.api_key, max_wait_time=50, auto_start_session=False) def test_session(self, mock_req): - agentops.start_session(config=self.config) + agentops.start_session() agentops.record(ActionEvent(self.event_type)) agentops.record(ActionEvent(self.event_type)) @@ -63,14 +72,14 @@ def test_session(self, mock_req): assert mock_req.last_request.headers["Authorization"] == f"Bearer some_jwt" request_json = mock_req.last_request.json() assert request_json["session"]["end_state"] == end_state - assert request_json["session"]["tags"] == None + assert len(request_json["session"]["tags"]) == 0 agentops.end_all_sessions() def test_add_tags(self, mock_req): # Arrange tags = ["GPT-4"] - agentops.start_session(tags=tags, config=self.config) + agentops.start_session(tags=tags) agentops.add_tags(["test-tag", "dupe-tag"]) agentops.add_tags(["dupe-tag"]) @@ -90,7 +99,7 @@ def test_add_tags(self, mock_req): def test_tags(self, mock_req): # Arrange tags = ["GPT-4"] - agentops.start_session(tags=tags, config=self.config) + agentops.start_session(tags=tags) # Act agentops.record(ActionEvent(self.event_type)) @@ -112,9 +121,7 @@ def test_tags(self, mock_req): def test_inherit_session_id(self, mock_req): # Arrange inherited_id = "4f72e834-ff26-4802-ba2d-62e7613446f1" - agentops.start_session( - tags=["test"], config=self.config, inherited_session_id=inherited_id - ) + agentops.start_session(tags=["test"], inherited_session_id=inherited_id) # Act # session_id correct @@ -129,63 +136,54 @@ def test_inherit_session_id(self, mock_req): agentops.end_all_sessions() def test_add_tags_with_string(self, mock_req): - agentops.start_session(config=self.config) + agentops.start_session() agentops.add_tags("wrong-type-tags") request_json = mock_req.last_request.json() assert request_json["session"]["tags"] == ["wrong-type-tags"] def test_session_add_tags_with_string(self, mock_req): - session = agentops.start_session(config=self.config) + session = agentops.start_session() session.add_tags("wrong-type-tags") request_json = mock_req.last_request.json() assert request_json["session"]["tags"] == ["wrong-type-tags"] def test_set_tags_with_string(self, mock_req): - agentops.start_session(config=self.config) + agentops.start_session() agentops.set_tags("wrong-type-tags") request_json = mock_req.last_request.json() assert request_json["session"]["tags"] == ["wrong-type-tags"] def test_session_set_tags_with_string(self, mock_req): - session = agentops.start_session(config=self.config) + session = agentops.start_session() + assert session is not None + session.set_tags("wrong-type-tags") request_json = mock_req.last_request.json() assert request_json["session"]["tags"] == ["wrong-type-tags"] - def test_add_tags_before_session(self, mock_req): - agentops.add_tags(["pre-session-tag"]) - agentops.start_session(config=self.config) - - request_json = mock_req.last_request.json() - assert request_json["session"]["tags"] == ["pre-session-tag"] - def test_set_tags_before_session(self, mock_req): - agentops.set_tags(["pre-session-tag"]) - agentops.start_session(config=self.config) + agentops.configure(default_tags=["pre-session-tag"]) + agentops.start_session() request_json = mock_req.last_request.json() assert request_json["session"]["tags"] == ["pre-session-tag"] - def test_no_config_doesnt_start_session(self, mock_req): - session = agentops.start_session() - assert session is None - def test_safe_get_session_no_session(self, mock_req): session = Client()._safe_get_session() assert session is None def test_safe_get_session_with_session(self, mock_req): - agentops.start_session(config=self.config) + agentops.start_session() session = Client()._safe_get_session() assert session is not None def test_safe_get_session_with_multiple_sessions(self, mock_req): - agentops.start_session(config=self.config) - agentops.start_session(config=self.config) + agentops.start_session() + agentops.start_session() session = Client()._safe_get_session() assert session is None @@ -193,15 +191,15 @@ def test_safe_get_session_with_multiple_sessions(self, mock_req): class TestMultiSessions: def setup_method(self): - self.api_key = "random_api_key" + self.api_key = "11111111-1111-4111-8111-111111111111" self.event_type = "test_event_type" - self.config = agentops.ClientConfiguration( - api_key=self.api_key, max_wait_time=50 - ) + agentops.init(api_key=self.api_key, max_wait_time=50, auto_start_session=False) def test_two_sessions(self, mock_req): - session_1 = agentops.start_session(config=self.config) - session_2 = agentops.start_session(config=self.config) + session_1 = agentops.start_session() + session_2 = agentops.start_session() + assert session_1 is not None + assert session_2 is not None assert len(agentops.Client().current_session_ids) == 2 assert agentops.Client().current_session_ids == [ @@ -233,7 +231,7 @@ def test_two_sessions(self, mock_req): assert mock_req.last_request.headers["Authorization"] == f"Bearer some_jwt" request_json = mock_req.last_request.json() assert request_json["session"]["end_state"] == end_state - assert request_json["session"]["tags"] is None + assert len(request_json["session"]["tags"]) == 0 session_2.end_session(end_state) # We should have 6 requests (2 additional end sessions) @@ -241,15 +239,17 @@ def test_two_sessions(self, mock_req): assert mock_req.last_request.headers["Authorization"] == f"Bearer some_jwt" request_json = mock_req.last_request.json() assert request_json["session"]["end_state"] == end_state - assert request_json["session"]["tags"] is None + assert len(request_json["session"]["tags"]) == 0 def test_add_tags(self, mock_req): # Arrange session_1_tags = ["session-1"] session_2_tags = ["session-2"] - session_1 = agentops.start_session(tags=session_1_tags, config=self.config) - session_2 = agentops.start_session(tags=session_2_tags, config=self.config) + session_1 = agentops.start_session(tags=session_1_tags) + session_2 = agentops.start_session(tags=session_2_tags) + assert session_1 is not None + assert session_2 is not None session_1.add_tags(["session-1-added", "session-1-added-2"]) session_2.add_tags(["session-2-added"]) diff --git a/tests/test_teardown.py b/tests/test_teardown.py index 2a59e297f..b86ea0792 100644 --- a/tests/test_teardown.py +++ b/tests/test_teardown.py @@ -18,9 +18,10 @@ def mock_req(): class TestSessions: def setup_method(self): - self.api_key = "random_api_key" + self.api_key = "11111111-1111-4111-8111-111111111111" self.event_type = "test_event_type" - self.client = Client(self.api_key) + Client().configure(api_key=self.api_key) + Client().initialize() def test_exit(self): # Tests should not hang.