From bdb7a412e68226909e858cb39f2530006a804cda Mon Sep 17 00:00:00 2001 From: Howard Gil Date: Thu, 25 Apr 2024 19:22:02 -0700 Subject: [PATCH] Instrument litellm (#163) * Setting session to none if server does not return 200 for /sessions * WIP. Changing override logic * Working * Tidying * WIP * Finished litellm * Added back override with deprecation message. Docstring fixes * Indent * Updated minimum supported litellm version * syntax --- agentops/__init__.py | 51 +++++++++++++-- agentops/client.py | 136 +++++++++++++++++++++++----------------- agentops/llm_tracker.py | 77 ++++++++++++++++------- 3 files changed, 183 insertions(+), 81 deletions(-) diff --git a/agentops/__init__.py b/agentops/__init__.py index 0de3019d..6cd37a41 100755 --- a/agentops/__init__.py +++ b/agentops/__init__.py @@ -17,7 +17,8 @@ def init(api_key: Optional[str] = None, max_wait_time: Optional[int] = None, max_queue_size: Optional[int] = None, tags: Optional[List[str]] = None, - override=True, + override: Optional[bool] = None, # Deprecated + instrument_llm_calls=True, auto_start_session=True, inherited_session_id: Optional[str] = None ): @@ -37,11 +38,12 @@ def init(api_key: Optional[str] = None, max_queue_size (int, optional): The maximum size of the event queue. Defaults to 100. tags (List[str], optional): Tags for the sessions that can be used for grouping or sorting later (e.g. ["GPT-4"]). - override (bool): Whether to override and LLM calls to emit as events. + override (bool, optional): [Deprecated] Use `instrument_llm_calls` instead. Whether to instrument LLM calls and emit LLMEvents.. + instrument_llm_calls (bool): Whether to instrument LLM calls and emit LLMEvents.. auto_start_session (bool): Whether to start a session automatically when the client is created. inherited_session_id (optional, str): Init Agentops with an existing Session Attributes: - """ + """ set_logging_level_info() c = Client(api_key=api_key, parent_key=parent_key, @@ -50,32 +52,67 @@ def init(api_key: Optional[str] = None, max_queue_size=max_queue_size, tags=tags, override=override, + instrument_llm_calls=instrument_llm_calls, auto_start_session=auto_start_session, inherited_session_id=inherited_session_id ) - + return inherited_session_id or c.current_session_id def end_session(end_state: str, end_state_reason: Optional[str] = None, video: Optional[str] = None): + """ + End the current session with the AgentOps service. + + Args: + end_state (str): The final state of the session. Options: Success, Fail, or Indeterminate. + end_state_reason (str, optional): The reason for ending the session. + video (str, optional): URL to a video recording of the session + """ Client().end_session(end_state, end_state_reason, video) def start_session(tags: Optional[List[str]] = None, config: Optional[Configuration] = None, inherited_session_id: Optional[str] = None): + """ + Start a new session for recording events. + + Args: + tags (List[str], optional): Tags that can be used for grouping or sorting later. + e.g. ["test_run"]. + config: (Configuration, optional): Client configuration object + """ return Client().start_session(tags, config, inherited_session_id) def record(event: Event | ErrorEvent): + """ + Record an event with the AgentOps service. + + Args: + event (Event): The event to record. + """ Client().record(event) def add_tags(tags: List[str]): + """ + Append to session tags at runtime. + + Args: + tags (List[str]): The list of tags to append. + """ Client().add_tags(tags) def set_tags(tags: List[str]): + """ + Replace session tags at runtime. + + Args: + tags (List[str]): The list of tags to set. + """ Client().set_tags(tags) @@ -84,4 +121,10 @@ def get_api_key() -> str: def set_parent_key(parent_key): + """ + Set the parent API key which has visibility to projects it is parent to. + + Args: + parent_key (str): The API key of the parent organization to set. + """ Client().set_parent_key(parent_key) diff --git a/agentops/client.py b/agentops/client.py index fe64799a..d6a4c14a 100644 --- a/agentops/client.py +++ b/agentops/client.py @@ -1,8 +1,8 @@ """ -AgentOps client module that provides a client class with public interfaces and configuration. + AgentOps client module that provides a client class with public interfaces and configuration. -Classes: - Client: Provides methods to interact with the AgentOps service. + Classes: + Client: Provides methods to interact with the AgentOps service. """ from .event import ActionEvent, ErrorEvent, Event @@ -29,27 +29,28 @@ @singleton class Client(metaclass=MetaClient): """ - Client for AgentOps service. - - Args: - - api_key (str, optional): API Key for AgentOps services. If none is provided, key will - be read from the AGENTOPS_API_KEY environment variable. - parent_key (str, optional): Organization key to give visibility of all user sessions the user's organization. If none is provided, key will - be read from the AGENTOPS_PARENT_KEY environment variable. - endpoint (str, optional): The endpoint for the AgentOps service. If none is provided, key will - be read from the AGENTOPS_API_ENDPOINT environment variable. Defaults to 'https://api.agentops.ai'. - max_wait_time (int, optional): The maximum time to wait in milliseconds before flushing the queue. - Defaults to 30,000 (30 seconds) - max_queue_size (int, optional): The maximum size of the event queue. Defaults to 100. - tags (List[str], optional): Tags for the sessions that can be used for grouping or - sorting later (e.g. ["GPT-4"]). - override (bool): Whether to override and LLM calls to emit as events. - auto_start_session (bool): Whether to start a session automatically when the client is created. - inherited_session_id (optional, str): Init Agentops with an existing Session - Attributes: - _session (Session, optional): A Session is a grouping of events (e.g. a run of your agent). - _worker (Worker, optional): A Worker manages the event queue and sends session updates to the AgentOps api server + Client for AgentOps service. + + Args: + + api_key (str, optional): API Key for AgentOps services. If none is provided, key will + be read from the AGENTOPS_API_KEY environment variable. + parent_key (str, optional): Organization key to give visibility of all user sessions the user's organization. If none is provided, key will + be read from the AGENTOPS_PARENT_KEY environment variable. + endpoint (str, optional): The endpoint for the AgentOps service. If none is provided, key will + be read from the AGENTOPS_API_ENDPOINT environment variable. Defaults to 'https://api.agentops.ai'. + max_wait_time (int, optional): The maximum time to wait in milliseconds before flushing the queue. + Defaults to 30,000 (30 seconds) + max_queue_size (int, optional): The maximum size of the event queue. Defaults to 100. + tags (List[str], optional): Tags for the sessions that can be used for grouping or + sorting later (e.g. ["GPT-4"]). + override (bool, optional): [Deprecated] Use `instrument_llm_calls` instead. Whether to instrument LLM calls and emit LLMEvents.. + instrument_llm_calls (bool): Whether to instrument LLM calls and emit LLMEvents.. + auto_start_session (bool): Whether to start a session automatically when the client is created. + inherited_session_id (optional, str): Init Agentops with an existing Session + Attributes: + _session (Session, optional): A Session is a grouping of events (e.g. a run of your agent). + _worker (Worker, optional): A Worker manages the event queue and sends session updates to the AgentOps api server """ def __init__(self, @@ -59,11 +60,17 @@ def __init__(self, max_wait_time: Optional[int] = None, max_queue_size: Optional[int] = None, tags: Optional[List[str]] = None, - override=True, + override: Optional[bool] = None, # Deprecated + instrument_llm_calls=True, auto_start_session=True, inherited_session_id: Optional[str] = None ): + if override is not None: + logger.warning("🖇 AgentOps: The 'override' parameter is deprecated. Use 'instrument_llm_calls' instead.", + DeprecationWarning, stacklevel=2) + instrument_llm_calls = instrument_llm_calls or override + self._session = None self._worker = None self._tags = tags @@ -82,12 +89,17 @@ def __init__(self, if auto_start_session: self.start_session(tags, self.config, inherited_session_id) - if override: - if 'openai' in sys.modules: - self.llm_tracker = LlmTracker(self) - self.llm_tracker.override_api('openai') + if instrument_llm_calls: + self.llm_tracker = LlmTracker(self) + self.llm_tracker.override_api() def add_tags(self, tags: List[str]): + """ + Append to session tags at runtime. + + Args: + tags (List[str]): The list of tags to append. + """ if self._tags is not None: self._tags.extend(tags) else: @@ -98,6 +110,12 @@ def add_tags(self, tags: List[str]): self._worker.update_session(self._session) def set_tags(self, tags: List[str]): + """ + Replace session tags at runtime. + + Args: + tags (List[str]): The list of tags to set. + """ self._tags = tags if self._session is not None: @@ -106,10 +124,10 @@ def set_tags(self, tags: List[str]): def record(self, event: Event | ErrorEvent): """ - Record an event with the AgentOps service. + Record an event with the AgentOps service. - Args: - event (Event): The event to record. + Args: + event (Event): The event to record. """ if self._session is not None and not self._session.has_ended: @@ -198,16 +216,16 @@ async def _record_event_async(self, func, event_name, *args, **kwargs): def start_session(self, tags: Optional[List[str]] = None, config: Optional[Configuration] = None, inherited_session_id: Optional[str] = None): """ - Start a new session for recording events. + Start a new session for recording events. - Args: - tags (List[str], optional): Tags that can be used for grouping or sorting later. - e.g. ["test_run"]. - config: (Configuration, optional): Client configuration object - inherited_session_id (optional, str): assign session id to match existing Session + Args: + tags (List[str], optional): Tags that can be used for grouping or sorting later. + e.g. ["test_run"]. + config: (Configuration, optional): Client configuration object + inherited_session_id (optional, str): assign session id to match existing Session """ set_logging_level_info() - + if self._session is not None: return logger.warning("🖇 AgentOps: Cannot start session - session already started") @@ -222,7 +240,7 @@ def start_session(self, tags: Optional[List[str]] = None, config: Optional[Confi return logger.warning("🖇 AgentOps: Cannot start session") logger.info('View info on this session at https://app.agentops.ai/drilldown?session_id={}' - .format(self._session.session_id)) + .format(self._session.session_id)) return self._session.session_id @@ -231,12 +249,12 @@ def end_session(self, end_state_reason: Optional[str] = None, video: Optional[str] = None): """ - End the current session with the AgentOps service. + End the current session with the AgentOps service. - Args: - end_state (str): The final state of the session. Options: Success, Fail, or Indeterminate. - end_state_reason (str, optional): The reason for ending the session. - video (str, optional): The video screen recording of the session + Args: + end_state (str): The final state of the session. Options: Success, Fail, or Indeterminate. + end_state_reason (str, optional): The reason for ending the session. + video (str, optional): The video screen recording of the session """ if self._session is None or self._session.has_ended: return logger.warning("🖇 AgentOps: Cannot end session - no current session") @@ -267,11 +285,11 @@ def cleanup(end_state: Optional[str] = 'Fail', end_state_reason: Optional[str] = def signal_handler(signum, frame): """ - Signal handler for SIGINT (Ctrl+C) and SIGTERM. Ends the session and exits the program. + Signal handler for SIGINT (Ctrl+C) and SIGTERM. Ends the session and exits the program. - Args: - signum (int): The signal number. - frame: The current stack frame. + Args: + signum (int): The signal number. + frame: The current stack frame. """ signal_name = 'SIGINT' if signum == signal.SIGINT else 'SIGTERM' logger.info( @@ -282,13 +300,13 @@ def signal_handler(signum, frame): def handle_exception(exc_type, exc_value, exc_traceback): """ - Handle uncaught exceptions before they result in program termination. + Handle uncaught exceptions before they result in program termination. - Args: - exc_type (Type[BaseException]): The type of the exception. - exc_value (BaseException): The exception instance. - exc_traceback (TracebackType): A traceback object encapsulating the call stack at the - point where the exception originally occurred. + Args: + exc_type (Type[BaseException]): The type of the exception. + exc_value (BaseException): The exception instance. + exc_traceback (TracebackType): A traceback object encapsulating the call stack at the + point where the exception originally occurred. """ formatted_traceback = ''.join(traceback.format_exception(exc_type, exc_value, exc_traceback)) @@ -315,7 +333,13 @@ def current_session_id(self): def api_key(self): return self.config.api_key - def set_parent_key(self, parent_key): + def set_parent_key(self, parent_key: str): + """ + Set the parent API key which has visibility to projects it is parent to. + + Args: + parent_key (str): The API key of the parent organization to set. + """ if self._worker: self._worker.config.parent_key = parent_key diff --git a/agentops/llm_tracker.py b/agentops/llm_tracker.py index 3602f0bc..539bd56a 100644 --- a/agentops/llm_tracker.py +++ b/agentops/llm_tracker.py @@ -1,7 +1,8 @@ import functools import sys from importlib import import_module -from packaging.version import parse +from importlib.metadata import version +from packaging.version import Version, parse from .log_config import logger from .event import LLMEvent, ErrorEvent from .helpers import get_ISO_time, check_call_stack_for_agent_id @@ -10,6 +11,7 @@ class LlmTracker: SUPPORTED_APIS = { + 'litellm': {'1.3.1': ("openai_chat_completions.completion",)}, 'openai': { '1.0.0': ( "chat.completions.create", @@ -201,7 +203,6 @@ def override_openai_v1_completion(self): # Store the original method original_create = completions.Completions.create - # Define the patched function def patched_function(*args, **kwargs): init_timestamp = get_ISO_time() # Call the original function with its original arguments @@ -216,7 +217,6 @@ def override_openai_v1_async_completion(self): # Store the original method original_create = completions.AsyncCompletions.create - # Define the patched function async def patched_function(*args, **kwargs): # Call the original function with its original arguments @@ -227,6 +227,34 @@ async def patched_function(*args, **kwargs): # Override the original method with the patched one completions.AsyncCompletions.create = patched_function + def override_litellm_completion(self): + import litellm + + original_create = litellm.completion + + def patched_function(*args, **kwargs): + init_timestamp = get_ISO_time() + result = original_create(*args, **kwargs) + # Note: litellm calls all LLM APIs using the OpenAI format + return self._handle_response_v1_openai(result, kwargs, init_timestamp) + + litellm.completion = patched_function + + def override_litellm_async_completion(self): + import litellm + + original_create = litellm.acompletion + + async def patched_function(*args, **kwargs): + # Call the original function with its original arguments + init_timestamp = get_ISO_time() + result = await original_create(*args, **kwargs) + # Note: litellm calls all LLM APIs using the OpenAI format + return self._handle_response_v1_openai(result, kwargs, init_timestamp) + + # Override the original method with the patched one + litellm.acompletion = patched_function + def _override_method(self, api, method_path, module): def handle_response(result, kwargs, init_timestamp): if api == "openai": @@ -260,24 +288,31 @@ def sync_method(*args, **kwargs): parent = functools.reduce(getattr, method_parts[:-1], module) setattr(parent, method_parts[-1], new_method) - def override_api(self, api): + def override_api(self): """ Overrides key methods of the specified API to record events. """ - if api in sys.modules: - if api not in self.SUPPORTED_APIS: - raise ValueError(f"Unsupported API: {api}") - - module = import_module(api) - if api == 'openai': - # Patch openai v1.0.0+ methods - if hasattr(module, '__version__'): - module_version = parse(module.__version__) - if module_version >= parse('1.0.0'): - self.override_openai_v1_completion() - self.override_openai_v1_async_completion() - return - - # Patch openai = parse('1.3.1'): + self.override_litellm_completion() + self.override_litellm_async_completion() + else: + logger.warning(f'🖇 AgentOps: Only litellm>=1.3.1 supported. v{module_version} found.') + return # If using an abstraction like litellm, do not patch the underlying LLM APIs + + if api == 'openai': + # Patch openai v1.0.0+ methods + if hasattr(module, '__version__'): + module_version = parse(module.__version__) + if module_version >= parse('1.0.0'): + self.override_openai_v1_completion() + self.override_openai_v1_async_completion() + else: + # Patch openai