diff --git a/README.md b/README.md index 26c97a5e..80554d1f 100644 --- a/README.md +++ b/README.md @@ -249,7 +249,113 @@ agentops.end_session('Success') -### LiteLLM +### Anthropic ๏นจ + +Track agents built with the Anthropic Python SDK (>=0.32.0). + +- [AgentOps integration example](./examples/anthropic/anthropic_example.ipynb) +- [Official Anthropic documentation](https://docs.anthropic.com/en/docs/welcome) + +
+ Installation + +```bash +pip install anthropic +``` + +```python python +import anthropic +import agentops + +# Beginning of program's code (i.e. main.py, __init__.py) +agentops.init() + +client = anthropic.Anthropic( + # This is the default and can be omitted + api_key=os.environ.get("ANTHROPIC_API_KEY"), +) + +message = client.messages.create( + max_tokens=1024, + messages=[ + { + "role": "user", + "content": "Tell me a cool fact about AgentOps", + } + ], + model="claude-3-opus-20240229", + ) +print(message.content) + +agentops.end_session('Success') +``` + +Streaming +```python python +import anthropic +import agentops + +# Beginning of program's code (i.e. main.py, __init__.py) +agentops.init() + +client = anthropic.Anthropic( + # This is the default and can be omitted + api_key=os.environ.get("ANTHROPIC_API_KEY"), +) + +stream = client.messages.create( + max_tokens=1024, + model="claude-3-opus-20240229", + messages=[ + { + "role": "user", + "content": "Tell me something cool about streaming agents", + } + ], + stream=True, +) + +response = "" +for event in stream: + if event.type == "content_block_delta": + response += event.delta.text + elif event.type == "message_stop": + print("\n") + print(response) + print("\n") +``` + +Async + +```python python +import asyncio +from anthropic import AsyncAnthropic + +client = AsyncAnthropic( + # This is the default and can be omitted + api_key=os.environ.get("ANTHROPIC_API_KEY"), +) + + +async def main() -> None: + message = await client.messages.create( + max_tokens=1024, + messages=[ + { + "role": "user", + "content": "Tell me something interesting about async agents", + } + ], + model="claude-3-opus-20240229", + ) + print(message.content) + + +await main() +``` +
+ +### LiteLLM ๐Ÿš… AgentOps provides support for LiteLLM(>=1.3.1), allowing you to call 100+ LLMs using the same Input/Output Format. diff --git a/agentops/client.py b/agentops/client.py index 8878e806..f2fc4dea 100644 --- a/agentops/client.py +++ b/agentops/client.py @@ -19,7 +19,7 @@ from termcolor import colored from .event import Event, ErrorEvent -from .helpers import ( +from .singleton import ( conditional_singleton, ) from .session import Session, active_sessions @@ -27,7 +27,7 @@ from .log_config import logger from .meta_client import MetaClient from .config import Configuration -from .llm_tracker import LlmTracker +from .llms import LlmTracker @conditional_singleton diff --git a/agentops/event.py b/agentops/event.py index a08efcd7..70ec059c 100644 --- a/agentops/event.py +++ b/agentops/event.py @@ -70,7 +70,7 @@ class LLMEvent(Event): thread_id(UUID, optional): The unique identifier of the contextual thread that a message pertains to. prompt(str, list, optional): The message or messages that were used to prompt the LLM. Preferably in ChatML format which is more fully supported by AgentOps. prompt_tokens(int, optional): The number of tokens in the prompt message. - completion(str, object, optional): The message or returned by the LLM. Preferably in ChatML format which is more fully supported by AgentOps. + completion(str, object, optional): The message or messages returned by the LLM. Preferably in ChatML format which is more fully supported by AgentOps. completion_tokens(int, optional): The number of tokens in the completion message. model(str, optional): LLM model e.g. "gpt-4", "gpt-3.5-turbo". diff --git a/agentops/helpers.py b/agentops/helpers.py index e04f4b57..2ec13264 100644 --- a/agentops/helpers.py +++ b/agentops/helpers.py @@ -1,7 +1,6 @@ from pprint import pformat from functools import wraps from datetime import datetime, timezone -import json import inspect from typing import Union import http.client @@ -11,38 +10,6 @@ from .log_config import logger from uuid import UUID from importlib.metadata import version -import subprocess - -ao_instances = {} - - -def singleton(class_): - - def getinstance(*args, **kwargs): - if class_ not in ao_instances: - ao_instances[class_] = class_(*args, **kwargs) - return ao_instances[class_] - - return getinstance - - -def conditional_singleton(class_): - - def getinstance(*args, **kwargs): - use_singleton = kwargs.pop("use_singleton", True) - if use_singleton: - if class_ not in ao_instances: - ao_instances[class_] = class_(*args, **kwargs) - return ao_instances[class_] - else: - return class_(*args, **kwargs) - - return getinstance - - -def clear_singletons(): - global ao_instances - ao_instances = {} def get_ISO_time(): diff --git a/agentops/llm_tracker.py b/agentops/llm_tracker.py deleted file mode 100644 index adadd886..00000000 --- a/agentops/llm_tracker.py +++ /dev/null @@ -1,1057 +0,0 @@ -import functools -import inspect -import pprint -import sys -from importlib import import_module -from importlib.metadata import version -from typing import Optional - -from packaging.version import Version, parse - -from .session import Session -from .event import ActionEvent, ErrorEvent, LLMEvent -from .helpers import check_call_stack_for_agent_id, get_ISO_time -from .log_config import logger -from .event import LLMEvent, ActionEvent, ToolEvent, ErrorEvent -from .helpers import get_ISO_time, check_call_stack_for_agent_id -import inspect -from typing import Optional -import pprint -from .time_travel import ( - fetch_completion_override_from_time_travel_cache, - # fetch_prompt_override_from_time_travel_cache, -) - -original_func = {} -original_create = None -original_create_async = None - - -class LlmTracker: - SUPPORTED_APIS = { - "litellm": {"1.3.1": ("openai_chat_completions.completion",)}, - "openai": { - "1.0.0": ("chat.completions.create",), - "0.0.0": ( - "ChatCompletion.create", - "ChatCompletion.acreate", - ), - }, - "cohere": { - "5.4.0": ("chat", "chat_stream"), - }, - "ollama": {"0.0.1": ("chat", "Client.chat", "AsyncClient.chat")}, - "groq": { - "0.9.0": ("Client.chat", "AsyncClient.chat"), - }, - } - - def __init__(self, client): - self.client = client - self.completion = "" - self.llm_event: Optional[LLMEvent] = None - - def _handle_response_v0_openai( - self, response, kwargs, init_timestamp, session: Optional[Session] = None - ): - """Handle responses for OpenAI versions v1.0.0""" - from openai import AsyncStream, Stream - from openai.resources import AsyncCompletions - from openai.types.chat import ChatCompletionChunk - - self.llm_event = LLMEvent(init_timestamp=init_timestamp, params=kwargs) - if session is not None: - self.llm_event.session_id = session.session_id - - def handle_stream_chunk(chunk: ChatCompletionChunk): - # NOTE: prompt/completion usage not returned in response when streaming - # We take the first ChatCompletionChunk and accumulate the deltas from all subsequent chunks to build one full chat completion - if self.llm_event.returns == None: - self.llm_event.returns = chunk - - try: - accumulated_delta = self.llm_event.returns.choices[0].delta - self.llm_event.agent_id = check_call_stack_for_agent_id() - self.llm_event.model = chunk.model - self.llm_event.prompt = kwargs["messages"] - - # NOTE: We assume for completion only choices[0] is relevant - choice = chunk.choices[0] - - if choice.delta.content: - accumulated_delta.content += choice.delta.content - - if choice.delta.role: - accumulated_delta.role = choice.delta.role - - if choice.delta.tool_calls: - accumulated_delta.tool_calls = choice.delta.tool_calls - - if choice.delta.function_call: - accumulated_delta.function_call = choice.delta.function_call - - if choice.finish_reason: - # Streaming is done. Record LLMEvent - self.llm_event.returns.choices[0].finish_reason = ( - choice.finish_reason - ) - self.llm_event.completion = { - "role": accumulated_delta.role, - "content": accumulated_delta.content, - "function_call": accumulated_delta.function_call, - "tool_calls": accumulated_delta.tool_calls, - } - self.llm_event.end_timestamp = get_ISO_time() - - self._safe_record(session, self.llm_event) - except Exception as e: - self._safe_record( - session, ErrorEvent(trigger_event=self.llm_event, exception=e) - ) - - kwargs_str = pprint.pformat(kwargs) - chunk = pprint.pformat(chunk) - logger.warning( - f"Unable to parse a chunk for LLM call. Skipping upload to AgentOps\n" - f"chunk:\n {chunk}\n" - f"kwargs:\n {kwargs_str}\n" - ) - - # if the response is a generator, decorate the generator - if isinstance(response, Stream): - - def generator(): - for chunk in response: - handle_stream_chunk(chunk) - yield chunk - - return generator() - - # For asynchronous AsyncStream - elif isinstance(response, AsyncStream): - - async def async_generator(): - async for chunk in response: - handle_stream_chunk(chunk) - yield chunk - - return async_generator() - - # For async AsyncCompletion - elif isinstance(response, AsyncCompletions): - - async def async_generator(): - async for chunk in response: - handle_stream_chunk(chunk) - yield chunk - - return async_generator() - - # v1.0.0+ responses are objects - try: - self.llm_event.returns = response - self.llm_event.agent_id = check_call_stack_for_agent_id() - self.llm_event.prompt = kwargs["messages"] - self.llm_event.prompt_tokens = response.usage.prompt_tokens - self.llm_event.completion = response.choices[0].message.model_dump() - self.llm_event.completion_tokens = response.usage.completion_tokens - self.llm_event.model = response.model - - self._safe_record(session, self.llm_event) - except Exception as e: - self._safe_record( - session, ErrorEvent(trigger_event=self.llm_event, exception=e) - ) - - kwargs_str = pprint.pformat(kwargs) - response = pprint.pformat(response) - logger.warning( - f"Unable to parse response for LLM call. Skipping upload to AgentOps\n" - f"response:\n {response}\n" - f"kwargs:\n {kwargs_str}\n" - ) - - return response - - def _handle_response_cohere( - self, response, kwargs, init_timestamp, session: Optional[Session] = None - ): - """Handle responses for Cohere versions >v5.4.0""" - from cohere.types.non_streamed_chat_response import NonStreamedChatResponse - from cohere.types.streamed_chat_response import ( - StreamedChatResponse, - StreamedChatResponse_CitationGeneration, - StreamedChatResponse_SearchQueriesGeneration, - StreamedChatResponse_SearchResults, - StreamedChatResponse_StreamEnd, - StreamedChatResponse_StreamStart, - StreamedChatResponse_TextGeneration, - StreamedChatResponse_ToolCallsGeneration, - ) - - # from cohere.types.chat import ChatGenerationChunk - # NOTE: Cohere only returns one message and its role will be CHATBOT which we are coercing to "assistant" - self.llm_event = LLMEvent(init_timestamp=init_timestamp, params=kwargs) - if session is not None: - self.llm_event.session_id = session.session_id - - self.action_events = {} - - def handle_stream_chunk(chunk, session: Optional[Session] = None): - - # We take the first chunk and accumulate the deltas from all subsequent chunks to build one full chat completion - if isinstance(chunk, StreamedChatResponse_StreamStart): - self.llm_event.returns = chunk - self.llm_event.agent_id = check_call_stack_for_agent_id() - self.llm_event.model = kwargs.get("model", "command-r-plus") - self.llm_event.prompt = kwargs["message"] - self.llm_event.completion = "" - return - - try: - if isinstance(chunk, StreamedChatResponse_StreamEnd): - # StreamedChatResponse_TextGeneration = LLMEvent - self.llm_event.completion = { - "role": "assistant", - "content": chunk.response.text, - } - self.llm_event.end_timestamp = get_ISO_time() - self._safe_record(session, self.llm_event) - - # StreamedChatResponse_SearchResults = ActionEvent - search_results = chunk.response.search_results - for search_result in search_results: - query = search_result.search_query - if query.generation_id in self.action_events: - action_event = self.action_events[query.generation_id] - search_result_dict = search_result.dict() - del search_result_dict["search_query"] - action_event.returns = search_result_dict - action_event.end_timestamp = get_ISO_time() - - # StreamedChatResponse_CitationGeneration = ActionEvent - documents = {doc["id"]: doc for doc in chunk.response.documents} - citations = chunk.response.citations - for citation in citations: - citation_id = f"{citation.start}.{citation.end}" - if citation_id in self.action_events: - action_event = self.action_events[citation_id] - citation_dict = citation.dict() - # Replace document_ids with the actual documents - citation_dict["documents"] = [ - documents[doc_id] - for doc_id in citation_dict["document_ids"] - if doc_id in documents - ] - del citation_dict["document_ids"] - - action_event.returns = citation_dict - action_event.end_timestamp = get_ISO_time() - - for key, action_event in self.action_events.items(): - self._safe_record(session, action_event) - - elif isinstance(chunk, StreamedChatResponse_TextGeneration): - self.llm_event.completion += chunk.text - elif isinstance(chunk, StreamedChatResponse_ToolCallsGeneration): - pass - elif isinstance(chunk, StreamedChatResponse_CitationGeneration): - for citation in chunk.citations: - self.action_events[f"{citation.start}.{citation.end}"] = ( - ActionEvent( - action_type="citation", - init_timestamp=get_ISO_time(), - params=citation.text, - ) - ) - elif isinstance(chunk, StreamedChatResponse_SearchQueriesGeneration): - for query in chunk.search_queries: - self.action_events[query.generation_id] = ActionEvent( - action_type="search_query", - init_timestamp=get_ISO_time(), - params=query.text, - ) - elif isinstance(chunk, StreamedChatResponse_SearchResults): - pass - - except Exception as e: - self._safe_record( - session, ErrorEvent(trigger_event=self.llm_event, exception=e) - ) - - kwargs_str = pprint.pformat(kwargs) - chunk = pprint.pformat(chunk) - logger.warning( - f"Unable to parse a chunk for LLM call. Skipping upload to AgentOps\n" - f"chunk:\n {chunk}\n" - f"kwargs:\n {kwargs_str}\n" - ) - - # NOTE: As of Cohere==5.x.x, async is not supported - # if the response is a generator, decorate the generator - if inspect.isasyncgen(response): - - async def async_generator(): - async for chunk in response: - handle_stream_chunk(chunk) - yield chunk - - return async_generator() - - elif inspect.isgenerator(response): - - def generator(): - for chunk in response: - handle_stream_chunk(chunk) - yield chunk - - return generator() - - # TODO: we should record if they pass a chat.connectors, because it means they intended to call a tool - # Not enough to record StreamedChatResponse_ToolCallsGeneration because the tool may have not gotten called - - try: - self.llm_event.returns = response - self.llm_event.agent_id = check_call_stack_for_agent_id() - self.llm_event.prompt = [] - if response.chat_history: - role_map = {"USER": "user", "CHATBOT": "assistant", "SYSTEM": "system"} - - for i in range(len(response.chat_history) - 1): - message = response.chat_history[i] - self.llm_event.prompt.append( - { - "role": role_map.get(message.role, message.role), - "content": message.message, - } - ) - - last_message = response.chat_history[-1] - self.llm_event.completion = { - "role": role_map.get(last_message.role, last_message.role), - "content": last_message.message, - } - self.llm_event.prompt_tokens = response.meta.tokens.input_tokens - self.llm_event.completion_tokens = response.meta.tokens.output_tokens - self.llm_event.model = kwargs.get("model", "command-r-plus") - - self._safe_record(session, self.llm_event) - except Exception as e: - self._safe_record( - session, ErrorEvent(trigger_event=self.llm_event, exception=e) - ) - kwargs_str = pprint.pformat(kwargs) - response = pprint.pformat(response) - logger.warning( - f"Unable to parse response for LLM call. Skipping upload to AgentOps\n" - f"response:\n {response}\n" - f"kwargs:\n {kwargs_str}\n" - ) - - return response - - def _handle_response_ollama( - self, response, kwargs, init_timestamp, session: Optional[Session] = None - ) -> None: - self.llm_event = LLMEvent(init_timestamp=init_timestamp, params=kwargs) - - def handle_stream_chunk(chunk: dict): - message = chunk.get("message", {"role": None, "content": ""}) - - if chunk.get("done"): - self.llm_event.completion["content"] += message.get("content") - self.llm_event.end_timestamp = get_ISO_time() - self.llm_event.model = f'ollama/{chunk.get("model")}' - self.llm_event.returns = chunk - self.llm_event.returns["message"] = self.llm_event.completion - self.llm_event.prompt = kwargs["messages"] - self.llm_event.agent_id = check_call_stack_for_agent_id() - self.client.record(self.llm_event) - - if self.llm_event.completion is None: - self.llm_event.completion = message - else: - self.llm_event.completion["content"] += message.get("content") - - if inspect.isgenerator(response): - - def generator(): - for chunk in response: - handle_stream_chunk(chunk) - yield chunk - - return generator() - - self.llm_event.end_timestamp = get_ISO_time() - - self.llm_event.model = f'ollama/{response["model"]}' - self.llm_event.returns = response - self.llm_event.agent_id = check_call_stack_for_agent_id() - self.llm_event.prompt = kwargs["messages"] - self.llm_event.completion = response["message"] - - self._safe_record(session, self.llm_event) - return response - - def _handle_response_groq( - self, response, kwargs, init_timestamp, session: Optional[Session] = None - ): - """Handle responses for OpenAI versions >v1.0.0""" - from groq import AsyncStream, Stream - from groq.resources.chat import AsyncCompletions - from groq.types.chat import ChatCompletionChunk - - self.llm_event = LLMEvent(init_timestamp=init_timestamp, params=kwargs) - if session is not None: - self.llm_event.session_id = session.session_id - - def handle_stream_chunk(chunk: ChatCompletionChunk): - # NOTE: prompt/completion usage not returned in response when streaming - # We take the first ChatCompletionChunk and accumulate the deltas from all subsequent chunks to build one full chat completion - if self.llm_event.returns == None: - self.llm_event.returns = chunk - - try: - accumulated_delta = self.llm_event.returns.choices[0].delta - self.llm_event.agent_id = check_call_stack_for_agent_id() - self.llm_event.model = chunk.model - self.llm_event.prompt = kwargs["messages"] - - # NOTE: We assume for completion only choices[0] is relevant - choice = chunk.choices[0] - - if choice.delta.content: - accumulated_delta.content += choice.delta.content - - if choice.delta.role: - accumulated_delta.role = choice.delta.role - - if choice.delta.tool_calls: - accumulated_delta.tool_calls = choice.delta.tool_calls - - if choice.delta.function_call: - accumulated_delta.function_call = choice.delta.function_call - - if choice.finish_reason: - # Streaming is done. Record LLMEvent - self.llm_event.returns.choices[0].finish_reason = ( - choice.finish_reason - ) - self.llm_event.completion = { - "role": accumulated_delta.role, - "content": accumulated_delta.content, - "function_call": accumulated_delta.function_call, - "tool_calls": accumulated_delta.tool_calls, - } - self.llm_event.end_timestamp = get_ISO_time() - - self._safe_record(session, self.llm_event) - except Exception as e: - self._safe_record( - session, ErrorEvent(trigger_event=self.llm_event, exception=e) - ) - - kwargs_str = pprint.pformat(kwargs) - chunk = pprint.pformat(chunk) - logger.warning( - f"Unable to parse a chunk for LLM call. Skipping upload to AgentOps\n" - f"chunk:\n {chunk}\n" - f"kwargs:\n {kwargs_str}\n" - ) - - # if the response is a generator, decorate the generator - if isinstance(response, Stream): - - def generator(): - for chunk in response: - handle_stream_chunk(chunk) - yield chunk - - return generator() - - # For asynchronous AsyncStream - elif isinstance(response, AsyncStream): - - async def async_generator(): - async for chunk in response: - handle_stream_chunk(chunk) - yield chunk - - return async_generator() - - # For async AsyncCompletion - elif isinstance(response, AsyncCompletions): - - async def async_generator(): - async for chunk in response: - handle_stream_chunk(chunk) - yield chunk - - return async_generator() - - # v1.0.0+ responses are objects - try: - self.llm_event.returns = response.model_dump() - self.llm_event.agent_id = check_call_stack_for_agent_id() - self.llm_event.prompt = kwargs["messages"] - self.llm_event.prompt_tokens = response.usage.prompt_tokens - self.llm_event.completion = response.choices[0].message.model_dump() - self.llm_event.completion_tokens = response.usage.completion_tokens - self.llm_event.model = response.model - - self._safe_record(session, self.llm_event) - except Exception as e: - self._safe_record( - session, ErrorEvent(trigger_event=self.llm_event, exception=e) - ) - - kwargs_str = pprint.pformat(kwargs) - response = pprint.pformat(response) - logger.warning( - f"Unable to parse response for LLM call. Skipping upload to AgentOps\n" - f"response:\n {response}\n" - f"kwargs:\n {kwargs_str}\n" - ) - - return response - - def override_openai_v1_completion(self): - from openai.resources.chat import completions - from openai.types.chat import ChatCompletion, ChatCompletionChunk - - # Store the original method - global original_create - original_create = completions.Completions.create - - def patched_function(*args, **kwargs): - init_timestamp = get_ISO_time() - session = kwargs.get("session", None) - if "session" in kwargs.keys(): - del kwargs["session"] - - completion_override = fetch_completion_override_from_time_travel_cache( - kwargs - ) - if completion_override: - result_model = None - pydantic_models = (ChatCompletion, ChatCompletionChunk) - for pydantic_model in pydantic_models: - try: - result_model = pydantic_model.model_validate_json( - completion_override - ) - break - except Exception as e: - pass - - if result_model is None: - logger.error( - f"Time Travel: Pydantic validation failed for {pydantic_models} \n" - f"Time Travel: Completion override was:\n" - f"{pprint.pformat(completion_override)}" - ) - return None - return self._handle_response_v1_openai( - result_model, kwargs, init_timestamp, session=session - ) - - # prompt_override = fetch_prompt_override_from_time_travel_cache(kwargs) - # if prompt_override: - # kwargs["messages"] = prompt_override["messages"] - - # Call the original function with its original arguments - result = original_create(*args, **kwargs) - return self._handle_response_v1_openai( - result, kwargs, init_timestamp, session=session - ) - - # Override the original method with the patched one - completions.Completions.create = patched_function - - def override_openai_v1_async_completion(self): - from openai.resources.chat import completions - from openai.types.chat import ChatCompletion, ChatCompletionChunk - - # Store the original method - global original_create_async - original_create_async = completions.AsyncCompletions.create - - async def patched_function(*args, **kwargs): - - init_timestamp = get_ISO_time() - - session = kwargs.get("session", None) - if "session" in kwargs.keys(): - del kwargs["session"] - - completion_override = fetch_completion_override_from_time_travel_cache( - kwargs - ) - if completion_override: - result_model = None - pydantic_models = (ChatCompletion, ChatCompletionChunk) - for pydantic_model in pydantic_models: - try: - result_model = pydantic_model.model_validate_json( - completion_override - ) - break - except Exception as e: - pass - - if result_model is None: - logger.error( - f"Time Travel: Pydantic validation failed for {pydantic_models} \n" - f"Time Travel: Completion override was:\n" - f"{pprint.pformat(completion_override)}" - ) - return None - return self._handle_response_v1_openai( - result_model, kwargs, init_timestamp, session=session - ) - - # prompt_override = fetch_prompt_override_from_time_travel_cache(kwargs) - # if prompt_override: - # kwargs["messages"] = prompt_override["messages"] - - # Call the original function with its original arguments - result = await original_create_async(*args, **kwargs) - return self._handle_response_v1_openai( - result, kwargs, init_timestamp, session=session - ) - - # Override the original method with the patched one - completions.AsyncCompletions.create = patched_function - - def override_litellm_completion(self): - import litellm - from openai.types.chat import ( - ChatCompletion, - ) # Note: litellm calls all LLM APIs using the OpenAI format - - original_create = litellm.completion - - def patched_function(*args, **kwargs): - init_timestamp = get_ISO_time() - - session = kwargs.get("session", None) - if "session" in kwargs.keys(): - del kwargs["session"] - - completion_override = fetch_completion_override_from_time_travel_cache( - kwargs - ) - if completion_override: - result_model = ChatCompletion.model_validate_json(completion_override) - return self._handle_response_v1_openai( - result_model, kwargs, init_timestamp, session=session - ) - - # prompt_override = fetch_prompt_override_from_time_travel_cache(kwargs) - # if prompt_override: - # kwargs["messages"] = prompt_override["messages"] - - # Call the original function with its original arguments - result = original_create(*args, **kwargs) - return self._handle_response_v1_openai( - result, kwargs, init_timestamp, session=session - ) - - litellm.completion = patched_function - - def override_litellm_async_completion(self): - import litellm - from openai.types.chat import ( - ChatCompletion, - ) # Note: litellm calls all LLM APIs using the OpenAI format - - original_create_async = litellm.acompletion - - async def patched_function(*args, **kwargs): - init_timestamp = get_ISO_time() - - session = kwargs.get("session", None) - if "session" in kwargs.keys(): - del kwargs["session"] - - completion_override = fetch_completion_override_from_time_travel_cache( - kwargs - ) - if completion_override: - result_model = ChatCompletion.model_validate_json(completion_override) - return self._handle_response_v1_openai( - result_model, kwargs, init_timestamp, session=session - ) - - # prompt_override = fetch_prompt_override_from_time_travel_cache(kwargs) - # if prompt_override: - # kwargs["messages"] = prompt_override["messages"] - - # Call the original function with its original arguments - result = await original_create_async(*args, **kwargs) - return self._handle_response_v1_openai( - result, kwargs, init_timestamp, session=session - ) - - # Override the original method with the patched one - litellm.acompletion = patched_function - - def override_cohere_chat(self): - import cohere - import cohere.types - - original_chat = cohere.Client.chat - - def patched_function(*args, **kwargs): - # Call the original function with its original arguments - init_timestamp = get_ISO_time() - session = kwargs.get("session", None) - if "session" in kwargs.keys(): - del kwargs["session"] - result = original_chat(*args, **kwargs) - return self._handle_response_cohere( - result, kwargs, init_timestamp, session=session - ) - - # Override the original method with the patched one - cohere.Client.chat = patched_function - - def override_cohere_chat_stream(self): - import cohere - - original_chat = cohere.Client.chat_stream - - def patched_function(*args, **kwargs): - # Call the original function with its original arguments - init_timestamp = get_ISO_time() - result = original_chat(*args, **kwargs) - return self._handle_response_cohere(result, kwargs, init_timestamp) - - # Override the original method with the patched one - cohere.Client.chat_stream = patched_function - - def override_ollama_chat(self): - import ollama - - original_func["ollama.chat"] = ollama.chat - - def patched_function(*args, **kwargs): - # Call the original function with its original arguments - init_timestamp = get_ISO_time() - result = original_func["ollama.chat"](*args, **kwargs) - return self._handle_response_ollama( - result, kwargs, init_timestamp, session=kwargs.get("session", None) - ) - - # Override the original method with the patched one - ollama.chat = patched_function - - def override_ollama_chat_client(self): - from ollama import Client - - original_func["ollama.Client.chat"] = Client.chat - - def patched_function(*args, **kwargs): - # Call the original function with its original arguments - init_timestamp = get_ISO_time() - result = original_func["ollama.Client.chat"](*args, **kwargs) - return self._handle_response_ollama(result, kwargs, init_timestamp) - - # Override the original method with the patched one - Client.chat = patched_function - - def override_ollama_chat_async_client(self): - from ollama import AsyncClient - - original_func["ollama.AsyncClient.chat"] = AsyncClient.chat - - async def patched_function(*args, **kwargs): - # Call the original function with its original arguments - init_timestamp = get_ISO_time() - result = await original_func["ollama.AsyncClient.chat"](*args, **kwargs) - return self._handle_response_ollama(result, kwargs, init_timestamp) - - # Override the original method with the patched one - AsyncClient.chat = patched_function - - def override_groq_chat(self): - from groq.resources.chat import completions - - original_create = completions.Completions.create - - def patched_function(*args, **kwargs): - # Call the original function with its original arguments - init_timestamp = get_ISO_time() - session = kwargs.get("session", None) - if "session" in kwargs.keys(): - del kwargs["session"] - result = original_create(*args, **kwargs) - return self._handle_response_groq( - result, kwargs, init_timestamp, session=session - ) - - # Override the original method with the patched one - completions.Completions.create = patched_function - - def override_groq_chat_stream(self): - from groq.resources.chat import completions - - original_create = completions.AsyncCompletions.create - - def patched_function(*args, **kwargs): - # Call the original function with its original arguments - init_timestamp = get_ISO_time() - result = original_create(*args, **kwargs) - return self._handle_response_groq(result, kwargs, init_timestamp) - - # Override the original method with the patched one - completions.AsyncCompletions.create = patched_function - - def _override_method(self, api, method_path, module): - def handle_response(result, kwargs, init_timestamp): - if api == "openai": - return self._handle_response_v0_openai(result, kwargs, init_timestamp) - return result - - def wrap_method(original_method): - if inspect.iscoroutinefunction(original_method): - - @functools.wraps(original_method) - async def async_method(*args, **kwargs): - init_timestamp = get_ISO_time() - response = await original_method(*args, **kwargs) - return handle_response(response, kwargs, init_timestamp) - - return async_method - - else: - - @functools.wraps(original_method) - def sync_method(*args, **kwargs): - init_timestamp = get_ISO_time() - response = original_method(*args, **kwargs) - return handle_response(response, kwargs, init_timestamp) - - return sync_method - - method_parts = method_path.split(".") - original_method = functools.reduce(getattr, method_parts, module) - new_method = wrap_method(original_method) - - if len(method_parts) == 1: - setattr(module, method_parts[0], new_method) - else: - parent = functools.reduce(getattr, method_parts[:-1], module) - setattr(parent, method_parts[-1], new_method) - - def override_api(self): - """ - Overrides key methods of the specified API to record events. - """ - - for api in self.SUPPORTED_APIS: - if api in sys.modules: - module = import_module(api) - if api == "litellm": - module_version = version(api) - if module_version is None: - logger.warning( - f"Cannot determine LiteLLM version. Only LiteLLM>=1.3.1 supported." - ) - - if Version(module_version) >= parse("1.3.1"): - self.override_litellm_completion() - self.override_litellm_async_completion() - else: - logger.warning( - f"Only LiteLLM>=1.3.1 supported. v{module_version} found." - ) - return # If using an abstraction like litellm, do not patch the underlying LLM APIs - - if api == "openai": - # Patch openai v1.0.0+ methods - if hasattr(module, "__version__"): - module_version = parse(module.__version__) - if module_version >= parse("1.0.0"): - self.override_openai_v1_completion() - self.override_openai_v1_async_completion() - else: - # Patch openai =5.4.0 supported." - ) - - if Version(module_version) >= parse("5.4.0"): - self.override_cohere_chat() - self.override_cohere_chat_stream() - else: - logger.warning( - f"Only Cohere>=5.4.0 supported. v{module_version} found." - ) - - if api == "ollama": - module_version = version(api) - - if Version(module_version) >= parse("0.0.1"): - self.override_ollama_chat() - self.override_ollama_chat_client() - self.override_ollama_chat_async_client() - else: - logger.warning( - f"Only Ollama>=0.0.1 supported. v{module_version} found." - ) - - if api == "groq": - module_version = version(api) - - if Version(module_version) >= parse("0.9.0"): - self.override_groq_chat() - self.override_groq_chat_stream() - else: - logger.warning( - f"Only Groq>=0.9.0 supported. v{module_version} found." - ) - - def stop_instrumenting(self): - self.undo_override_openai_v1_async_completion() - self.undo_override_openai_v1_completion() - self.undo_override_ollama() - - def undo_override_openai_v1_completion(self): - global original_create - from openai.resources.chat import completions - - completions.Completions.create = original_create - - def undo_override_openai_v1_async_completion(self): - global original_create_async - from openai.resources.chat import completions - - completions.AsyncCompletions.create = original_create_async - - def undo_override_ollama(self): - if "ollama" in sys.modules: - import ollama - - ollama.chat = original_func["ollama.chat"] - ollama.Client.chat = original_func["ollama.Client.chat"] - ollama.AsyncClient.chat = original_func["ollama.AsyncClient.chat"] - - def _safe_record(self, session, event): - if session is not None: - session.record(event) - else: - self.client.record(event) diff --git a/agentops/llms/__init__.py b/agentops/llms/__init__.py new file mode 100644 index 00000000..e3e6f7cf --- /dev/null +++ b/agentops/llms/__init__.py @@ -0,0 +1,145 @@ +import functools +import sys +from importlib import import_module +from importlib.metadata import version + +from packaging.version import Version, parse + +from ..log_config import logger + +from .cohere import CohereProvider +from .groq import GroqProvider +from .litellm import LiteLLMProvider +from .ollama import OllamaProvider +from .openai import OpenAiProvider +from .anthropic import AnthropicProvider + +original_func = {} +original_create = None +original_create_async = None + + +class LlmTracker: + SUPPORTED_APIS = { + "litellm": {"1.3.1": ("openai_chat_completions.completion",)}, + "openai": { + "1.0.0": ("chat.completions.create",), + "0.0.0": ( + "ChatCompletion.create", + "ChatCompletion.acreate", + ), + }, + "cohere": { + "5.4.0": ("chat", "chat_stream"), + }, + "ollama": {"0.0.1": ("chat", "Client.chat", "AsyncClient.chat")}, + "groq": { + "0.9.0": ("Client.chat", "AsyncClient.chat"), + }, + "anthropic": { + "0.32.0": ("completions.create",), + }, + } + + def __init__(self, client): + self.client = client + self.completion = "" + + def override_api(self): + """ + Overrides key methods of the specified API to record events. + """ + + for api in self.SUPPORTED_APIS: + if api in sys.modules: + module = import_module(api) + if api == "litellm": + module_version = version(api) + if module_version is None: + logger.warning( + f"Cannot determine LiteLLM version. Only LiteLLM>=1.3.1 supported." + ) + + if Version(module_version) >= parse("1.3.1"): + provider = LiteLLMProvider(self.client) + provider.override() + else: + logger.warning( + f"Only LiteLLM>=1.3.1 supported. v{module_version} found." + ) + return # If using an abstraction like litellm, do not patch the underlying LLM APIs + + if api == "openai": + # Patch openai v1.0.0+ methods + if hasattr(module, "__version__"): + module_version = parse(module.__version__) + if module_version >= parse("1.0.0"): + provider = OpenAiProvider(self.client) + provider.override() + else: + raise DeprecationWarning( + "OpenAI versions < 0.1 are no longer supported by AgentOps. Please upgrade OpenAI or " + "downgrade AgentOps to <=0.3.8." + ) + + if api == "cohere": + # Patch cohere v5.4.0+ methods + module_version = version(api) + if module_version is None: + logger.warning( + f"Cannot determine Cohere version. Only Cohere>=5.4.0 supported." + ) + + if Version(module_version) >= parse("5.4.0"): + provider = CohereProvider(self.client) + provider.override() + else: + logger.warning( + f"Only Cohere>=5.4.0 supported. v{module_version} found." + ) + + if api == "ollama": + module_version = version(api) + + if Version(module_version) >= parse("0.0.1"): + provider = OllamaProvider(self.client) + provider.override() + else: + logger.warning( + f"Only Ollama>=0.0.1 supported. v{module_version} found." + ) + + if api == "groq": + module_version = version(api) + + if Version(module_version) >= parse("0.9.0"): + provider = GroqProvider(self.client) + provider.override() + else: + logger.warning( + f"Only Groq>=0.9.0 supported. v{module_version} found." + ) + + if api == "anthropic": + module_version = version(api) + + if module_version is None: + logger.warning( + f"Cannot determine Anthropic version. Only Anthropic>=0.32.0 supported." + ) + + if Version(module_version) >= parse("0.32.0"): + provider = AnthropicProvider(self.client) + provider.override() + else: + logger.warning( + f"Only Anthropic>=0.32.0 supported. v{module_version} found." + ) + + def stop_instrumenting(self): + OpenAiProvider(self.client).undo_override() + GroqProvider(self.client).undo_override() + CohereProvider(self.client).undo_override() + LiteLLMProvider(self.client).undo_override() + OllamaProvider(self.client).undo_override() + AnthropicProvider(self.client).undo_override() diff --git a/agentops/llms/anthropic.py b/agentops/llms/anthropic.py new file mode 100644 index 00000000..cbefe054 --- /dev/null +++ b/agentops/llms/anthropic.py @@ -0,0 +1,289 @@ +import pprint +from typing import Optional + +from agentops.llms.instrumented_provider import InstrumentedProvider +from agentops.time_travel import fetch_completion_override_from_time_travel_cache + +from ..event import ErrorEvent, LLMEvent, ToolEvent +from ..session import Session +from ..log_config import logger +from ..helpers import check_call_stack_for_agent_id, get_ISO_time +from ..singleton import singleton + + +@singleton +class AnthropicProvider(InstrumentedProvider): + + original_create = None + original_create_async = None + + def __init__(self, client): + super().__init__(client) + self._provider_name = "Anthropic" + self.tool_event = {} + self.tool_id = "" + + def handle_response( + self, response, kwargs, init_timestamp, session: Optional[Session] = None + ): + """Handle responses for Anthropic""" + from anthropic import Stream, AsyncStream + from anthropic.resources import AsyncMessages + from anthropic.types import Message + + self.llm_event = LLMEvent(init_timestamp=init_timestamp, params=kwargs) + if session is not None: + self.llm_event.session_id = session.session_id + + def handle_stream_chunk(chunk: Message): + try: + # We take the first chunk and accumulate the deltas from all subsequent chunks to build one full chat completion + if chunk.type == "message_start": + self.llm_event.returns = chunk + self.llm_event.agent_id = check_call_stack_for_agent_id() + self.llm_event.model = kwargs["model"] + self.llm_event.prompt = kwargs["messages"] + self.llm_event.prompt_tokens = chunk.message.usage.input_tokens + self.llm_event.completion = { + "role": chunk.message.role, + "content": "", # Always returned as [] in this instance type + } + + elif chunk.type == "content_block_start": + if chunk.content_block.type == "text": + self.llm_event.completion["content"] += chunk.content_block.text + + elif chunk.content_block.type == "tool_use": + self.tool_id = chunk.content_block.id + self.tool_event[self.tool_id] = ToolEvent( + name=chunk.content_block.name, + logs={"type": chunk.content_block.type, "input": ""}, + ) + + elif chunk.type == "content_block_delta": + if chunk.delta.type == "text_delta": + self.llm_event.completion["content"] += chunk.delta.text + + elif chunk.delta.type == "input_json_delta": + self.tool_event[self.tool_id].logs[ + "input" + ] += chunk.delta.partial_json + + elif chunk.type == "content_block_stop": + pass + + elif chunk.type == "message_delta": + self.llm_event.completion_tokens = chunk.usage.output_tokens + + elif chunk.type == "message_stop": + self.llm_event.end_timestamp = get_ISO_time() + self._safe_record(session, self.llm_event) + + except Exception as e: + self._safe_record( + session, ErrorEvent(trigger_event=self.llm_event, exception=e) + ) + + kwargs_str = pprint.pformat(kwargs) + chunk = pprint.pformat(chunk) + logger.warning( + f"Unable to parse a chunk for LLM call. Skipping upload to AgentOps\n" + f"chunk:\n {chunk}\n" + f"kwargs:\n {kwargs_str}\n", + ) + + # if the response is a generator, decorate the generator + if isinstance(response, Stream): + + def generator(): + for chunk in response: + handle_stream_chunk(chunk) + yield chunk + + return generator() + + # For asynchronous AsyncStream + if isinstance(response, AsyncStream): + + async def async_generator(): + async for chunk in response: + handle_stream_chunk(chunk) + yield chunk + + return async_generator() + + # For async AsyncMessages + if isinstance(response, AsyncMessages): + + async def async_generator(): + async for chunk in response: + handle_stream_chunk(chunk) + yield chunk + + return async_generator() + + # Handle object responses + try: + self.llm_event.returns = response.model_dump() + self.llm_event.agent_id = check_call_stack_for_agent_id() + self.llm_event.prompt = kwargs["messages"] + self.llm_event.prompt_tokens = response.usage.input_tokens + self.llm_event.completion = { + "role": "assistant", + "content": response.content[0].text, + } + self.llm_event.completion_tokens = response.usage.output_tokens + self.llm_event.model = response.model + self.llm_event.end_timestamp = get_ISO_time() + + self._safe_record(session, self.llm_event) + except Exception as e: + self._safe_record( + session, ErrorEvent(trigger_event=self.llm_event, exception=e) + ) + kwargs_str = pprint.pformat(kwargs) + response = pprint.pformat(response) + logger.warning( + f"Unable to parse response for LLM call. Skipping upload to AgentOps\n" + f"response:\n {response}\n" + f"kwargs:\n {kwargs_str}\n" + ) + + return response + + def override(self): + self._override_completion() + self._override_async_completion() + + def _override_completion(self): + from anthropic.resources import messages + from anthropic.types import ( + Message, + RawContentBlockDeltaEvent, + RawContentBlockStartEvent, + RawContentBlockStopEvent, + RawMessageDeltaEvent, + RawMessageStartEvent, + RawMessageStopEvent, + ) + + # Store the original method + self.original_create = messages.Messages.create + + def patched_function(*args, **kwargs): + init_timestamp = get_ISO_time() + session = kwargs.get("session", None) + if "session" in kwargs.keys(): + del kwargs["session"] + + completion_override = fetch_completion_override_from_time_travel_cache( + kwargs + ) + if completion_override: + result_model = None + pydantic_models = ( + Message, + RawContentBlockDeltaEvent, + RawContentBlockStartEvent, + RawContentBlockStopEvent, + RawMessageDeltaEvent, + RawMessageStartEvent, + RawMessageStopEvent, + ) + + for pydantic_model in pydantic_models: + try: + result_model = pydantic_model.model_validate_json( + completion_override + ) + break + except Exception as e: + pass + + if result_model is None: + logger.error( + f"Time Travel: Pydantic validation failed for {pydantic_models} \n" + f"Time Travel: Completion override was:\n" + f"{pprint.pformat(completion_override)}" + ) + return None + return self.handle_response( + result_model, kwargs, init_timestamp, session=session + ) + + # Call the original function with its original arguments + result = self.original_create(*args, **kwargs) + return self.handle_response(result, kwargs, init_timestamp, session=session) + + # Override the original method with the patched one + messages.Messages.create = patched_function + + def _override_async_completion(self): + from anthropic.resources import messages + from anthropic.types import ( + Message, + RawContentBlockDeltaEvent, + RawContentBlockStartEvent, + RawContentBlockStopEvent, + RawMessageDeltaEvent, + RawMessageStartEvent, + RawMessageStopEvent, + ) + + # Store the original method + self.original_create_async = messages.AsyncMessages.create + + async def patched_function(*args, **kwargs): + # Call the original function with its original arguments + init_timestamp = get_ISO_time() + session = kwargs.get("session", None) + if "session" in kwargs.keys(): + del kwargs["session"] + + completion_override = fetch_completion_override_from_time_travel_cache( + kwargs + ) + if completion_override: + result_model = None + pydantic_models = ( + Message, + RawContentBlockDeltaEvent, + RawContentBlockStartEvent, + RawContentBlockStopEvent, + RawMessageDeltaEvent, + RawMessageStartEvent, + RawMessageStopEvent, + ) + + for pydantic_model in pydantic_models: + try: + result_model = pydantic_model.model_validate_json( + completion_override + ) + break + except Exception as e: + pass + + if result_model is None: + logger.error( + f"Time Travel: Pydantic validation failed for {pydantic_models} \n" + f"Time Travel: Completion override was:\n" + f"{pprint.pformat(completion_override)}" + ) + return None + + return self.handle_response( + result_model, kwargs, init_timestamp, session=session + ) + + result = await self.original_create_async(*args, **kwargs) + return self.handle_response(result, kwargs, init_timestamp, session=session) + + # Override the original method with the patched one + messages.AsyncMessages.create = patched_function + + def undo_override(self): + from anthropic.resources import messages + + messages.Messages.create = self.original_create + messages.AsyncMessages.create = self.original_create_async diff --git a/agentops/llms/cohere.py b/agentops/llms/cohere.py new file mode 100644 index 00000000..79f2cb04 --- /dev/null +++ b/agentops/llms/cohere.py @@ -0,0 +1,258 @@ +import inspect +import pprint +from typing import Optional + +from .instrumented_provider import InstrumentedProvider +from ..event import ActionEvent, ErrorEvent, LLMEvent +from ..session import Session +from ..log_config import logger +from agentops.helpers import get_ISO_time, check_call_stack_for_agent_id +from ..singleton import singleton + + +@singleton +class CohereProvider(InstrumentedProvider): + original_create = None + original_create_stream = None + original_create_async = None + + def override(self): + self._override_chat() + self._override_chat_stream() + self._override_async_chat() + + def undo_override(self): + import cohere + + cohere.Client.chat = self.original_create + cohere.Client.chat_stream = self.original_create_stream + cohere.AsyncClient.chat = self.original_create_async + + def __init__(self, client): + super().__init__(client) + + def handle_response( + self, response, kwargs, init_timestamp, session: Optional[Session] = None + ): + """Handle responses for Cohere versions >v5.4.0""" + from cohere.types.streamed_chat_response import ( + StreamedChatResponse_CitationGeneration, + StreamedChatResponse_SearchQueriesGeneration, + StreamedChatResponse_SearchResults, + StreamedChatResponse_StreamEnd, + StreamedChatResponse_StreamStart, + StreamedChatResponse_TextGeneration, + StreamedChatResponse_ToolCallsGeneration, + ) + + # from cohere.types.chat import ChatGenerationChunk + # NOTE: Cohere only returns one message and its role will be CHATBOT which we are coercing to "assistant" + self.llm_event = LLMEvent(init_timestamp=init_timestamp, params=kwargs) + if session is not None: + self.llm_event.session_id = session.session_id + + self.action_events = {} + + def handle_stream_chunk(chunk, session: Optional[Session] = None): + + # We take the first chunk and accumulate the deltas from all subsequent chunks to build one full chat completion + if isinstance(chunk, StreamedChatResponse_StreamStart): + self.llm_event.returns = chunk + self.llm_event.agent_id = check_call_stack_for_agent_id() + self.llm_event.model = kwargs.get("model", "command-r-plus") + self.llm_event.prompt = kwargs["message"] + self.llm_event.completion = "" + return + + try: + if isinstance(chunk, StreamedChatResponse_StreamEnd): + # StreamedChatResponse_TextGeneration = LLMEvent + self.llm_event.completion = { + "role": "assistant", + "content": chunk.response.text, + } + self.llm_event.end_timestamp = get_ISO_time() + self._safe_record(session, self.llm_event) + + # StreamedChatResponse_SearchResults = ActionEvent + search_results = chunk.response.search_results + if search_results: + for search_result in search_results: + query = search_result.search_query + if query.generation_id in self.action_events: + action_event = self.action_events[query.generation_id] + search_result_dict = search_result.dict() + del search_result_dict["search_query"] + action_event.returns = search_result_dict + action_event.end_timestamp = get_ISO_time() + + # StreamedChatResponse_CitationGeneration = ActionEvent + if chunk.response.documents: + documents = {doc["id"]: doc for doc in chunk.response.documents} + citations = chunk.response.citations + for citation in citations: + citation_id = f"{citation.start}.{citation.end}" + if citation_id in self.action_events: + action_event = self.action_events[citation_id] + citation_dict = citation.dict() + # Replace document_ids with the actual documents + citation_dict["documents"] = [ + documents[doc_id] + for doc_id in citation_dict["document_ids"] + if doc_id in documents + ] + del citation_dict["document_ids"] + + action_event.returns = citation_dict + action_event.end_timestamp = get_ISO_time() + + for key, action_event in self.action_events.items(): + self._safe_record(session, action_event) + + elif isinstance(chunk, StreamedChatResponse_TextGeneration): + self.llm_event.completion += chunk.text + elif isinstance(chunk, StreamedChatResponse_ToolCallsGeneration): + pass + elif isinstance(chunk, StreamedChatResponse_CitationGeneration): + for citation in chunk.citations: + self.action_events[f"{citation.start}.{citation.end}"] = ( + ActionEvent( + action_type="citation", + init_timestamp=get_ISO_time(), + params=citation.text, + ) + ) + elif isinstance(chunk, StreamedChatResponse_SearchQueriesGeneration): + for query in chunk.search_queries: + self.action_events[query.generation_id] = ActionEvent( + action_type="search_query", + init_timestamp=get_ISO_time(), + params=query.text, + ) + elif isinstance(chunk, StreamedChatResponse_SearchResults): + pass + + except Exception as e: + self._safe_record( + session, ErrorEvent(trigger_event=self.llm_event, exception=e) + ) + + kwargs_str = pprint.pformat(kwargs) + chunk = pprint.pformat(chunk) + logger.warning( + f"Unable to parse a chunk for LLM call. Skipping upload to AgentOps\n" + f"chunk:\n {chunk}\n" + f"kwargs:\n {kwargs_str}\n" + ) + raise e + + # NOTE: As of Cohere==5.x.x, async is not supported + # if the response is a generator, decorate the generator + if inspect.isasyncgen(response): + + async def async_generator(): + async for chunk in response: + handle_stream_chunk(chunk) + yield chunk + + return async_generator() + + elif inspect.isgenerator(response): + + def generator(): + for chunk in response: + handle_stream_chunk(chunk) + yield chunk + + return generator() + + # TODO: we should record if they pass a chat.connectors, because it means they intended to call a tool + # Not enough to record StreamedChatResponse_ToolCallsGeneration because the tool may have not gotten called + + try: + self.llm_event.returns = response + self.llm_event.agent_id = check_call_stack_for_agent_id() + self.llm_event.prompt = [] + if response.chat_history: + role_map = {"USER": "user", "CHATBOT": "assistant", "SYSTEM": "system"} + + for i in range(len(response.chat_history) - 1): + message = response.chat_history[i] + self.llm_event.prompt.append( + { + "role": role_map.get(message.role, message.role), + "content": message.message, + } + ) + + last_message = response.chat_history[-1] + self.llm_event.completion = { + "role": role_map.get(last_message.role, last_message.role), + "content": last_message.message, + } + self.llm_event.prompt_tokens = response.meta.tokens.input_tokens + self.llm_event.completion_tokens = response.meta.tokens.output_tokens + self.llm_event.model = kwargs.get("model", "command-r-plus") + + self._safe_record(session, self.llm_event) + except Exception as e: + self._safe_record( + session, ErrorEvent(trigger_event=self.llm_event, exception=e) + ) + kwargs_str = pprint.pformat(kwargs) + response = pprint.pformat(response) + logger.warning( + f"Unable to parse response for LLM call. Skipping upload to AgentOps\n" + f"response:\n {response}\n" + f"kwargs:\n {kwargs_str}\n" + ) + + return response + + def _override_chat(self): + import cohere + + self.original_create = cohere.Client.chat + + def patched_function(*args, **kwargs): + # Call the original function with its original arguments + init_timestamp = get_ISO_time() + session = kwargs.get("session", None) + if "session" in kwargs.keys(): + del kwargs["session"] + result = self.original_create(*args, **kwargs) + return self.handle_response(result, kwargs, init_timestamp, session=session) + + # Override the original method with the patched one + cohere.Client.chat = patched_function + + def _override_async_chat(self): + import cohere.types + + self.original_create_async = cohere.AsyncClient.chat + + async def patched_function(*args, **kwargs): + # Call the original function with its original arguments + init_timestamp = get_ISO_time() + session = kwargs.get("session", None) + if "session" in kwargs.keys(): + del kwargs["session"] + result = await self.original_create_async(*args, **kwargs) + return self.handle_response(result, kwargs, init_timestamp, session=session) + + # Override the original method with the patched one + cohere.AsyncClient.chat = patched_function + + def _override_chat_stream(self): + import cohere + + self.original_create_stream = cohere.Client.chat_stream + + def patched_function(*args, **kwargs): + # Call the original function with its original arguments + init_timestamp = get_ISO_time() + result = self.original_create_stream(*args, **kwargs) + return self.handle_response(result, kwargs, init_timestamp) + + # Override the original method with the patched one + cohere.Client.chat_stream = patched_function diff --git a/agentops/llms/groq.py b/agentops/llms/groq.py new file mode 100644 index 00000000..54e61473 --- /dev/null +++ b/agentops/llms/groq.py @@ -0,0 +1,182 @@ +import pprint +from typing import Optional + +from .instrumented_provider import InstrumentedProvider +from ..event import ErrorEvent, LLMEvent +from ..session import Session +from ..log_config import logger +from agentops.helpers import get_ISO_time, check_call_stack_for_agent_id +from ..singleton import singleton + + +@singleton +class GroqProvider(InstrumentedProvider): + original_create = None + original_async_create = None + + def __init__(self, client): + super().__init__(client) + self.client = client + + def override(self): + self._override_chat() + self._override_async_chat() + + def undo_override(self): + from groq.resources.chat import completions + + completions.Completions.create = self.original_create + completions.AsyncCompletions.create = self.original_create + + def handle_response( + self, response, kwargs, init_timestamp, session: Optional[Session] = None + ): + """Handle responses for OpenAI versions >v1.0.0""" + from groq import AsyncStream, Stream + from groq.resources.chat import AsyncCompletions + from groq.types.chat import ChatCompletionChunk + + self.llm_event = LLMEvent(init_timestamp=init_timestamp, params=kwargs) + if session is not None: + self.llm_event.session_id = session.session_id + + def handle_stream_chunk(chunk: ChatCompletionChunk): + # NOTE: prompt/completion usage not returned in response when streaming + # We take the first ChatCompletionChunk and accumulate the deltas from all subsequent chunks to build one full chat completion + if self.llm_event.returns == None: + self.llm_event.returns = chunk + + try: + accumulated_delta = self.llm_event.returns.choices[0].delta + self.llm_event.agent_id = check_call_stack_for_agent_id() + self.llm_event.model = chunk.model + self.llm_event.prompt = kwargs["messages"] + + # NOTE: We assume for completion only choices[0] is relevant + choice = chunk.choices[0] + + if choice.delta.content: + accumulated_delta.content += choice.delta.content + + if choice.delta.role: + accumulated_delta.role = choice.delta.role + + if choice.delta.tool_calls: + accumulated_delta.tool_calls = choice.delta.tool_calls + + if choice.delta.function_call: + accumulated_delta.function_call = choice.delta.function_call + + if choice.finish_reason: + # Streaming is done. Record LLMEvent + self.llm_event.returns.choices[0].finish_reason = ( + choice.finish_reason + ) + self.llm_event.completion = { + "role": accumulated_delta.role, + "content": accumulated_delta.content, + "function_call": accumulated_delta.function_call, + "tool_calls": accumulated_delta.tool_calls, + } + self.llm_event.end_timestamp = get_ISO_time() + + self._safe_record(session, self.llm_event) + except Exception as e: + self._safe_record( + session, ErrorEvent(trigger_event=self.llm_event, exception=e) + ) + + kwargs_str = pprint.pformat(kwargs) + chunk = pprint.pformat(chunk) + logger.warning( + f"Unable to parse a chunk for LLM call. Skipping upload to AgentOps\n" + f"chunk:\n {chunk}\n" + f"kwargs:\n {kwargs_str}\n" + ) + + # if the response is a generator, decorate the generator + if isinstance(response, Stream): + + def generator(): + for chunk in response: + handle_stream_chunk(chunk) + yield chunk + + return generator() + + # For asynchronous AsyncStream + elif isinstance(response, AsyncStream): + + async def async_generator(): + async for chunk in response: + handle_stream_chunk(chunk) + yield chunk + + return async_generator() + + # For async AsyncCompletion + elif isinstance(response, AsyncCompletions): + + async def async_generator(): + async for chunk in response: + handle_stream_chunk(chunk) + yield chunk + + return async_generator() + + # v1.0.0+ responses are objects + try: + self.llm_event.returns = response.model_dump() + self.llm_event.agent_id = check_call_stack_for_agent_id() + self.llm_event.prompt = kwargs["messages"] + self.llm_event.prompt_tokens = response.usage.prompt_tokens + self.llm_event.completion = response.choices[0].message.model_dump() + self.llm_event.completion_tokens = response.usage.completion_tokens + self.llm_event.model = response.model + + self._safe_record(session, self.llm_event) + except Exception as e: + self._safe_record( + session, ErrorEvent(trigger_event=self.llm_event, exception=e) + ) + + kwargs_str = pprint.pformat(kwargs) + response = pprint.pformat(response) + logger.warning( + f"Unable to parse response for LLM call. Skipping upload to AgentOps\n" + f"response:\n {response}\n" + f"kwargs:\n {kwargs_str}\n" + ) + + return response + + def _override_chat(self): + from groq.resources.chat import completions + + self.original_create = completions.Completions.create + + def patched_function(*args, **kwargs): + # Call the original function with its original arguments + init_timestamp = get_ISO_time() + session = kwargs.get("session", None) + if "session" in kwargs.keys(): + del kwargs["session"] + result = self.original_create(*args, **kwargs) + return self.handle_response(result, kwargs, init_timestamp, session=session) + + # Override the original method with the patched one + completions.Completions.create = patched_function + + def _override_async_chat(self): + from groq.resources.chat import completions + + self.original_async_create = completions.AsyncCompletions.create + + async def patched_function(*args, **kwargs): + # Call the original function with its original arguments + init_timestamp = get_ISO_time() + result = await self.original_async_create(*args, **kwargs) + return self.handle_response(result, kwargs, init_timestamp) + + # Override the original method with the patched one + completions.AsyncCompletions.create = patched_function diff --git a/agentops/llms/instrumented_provider.py b/agentops/llms/instrumented_provider.py new file mode 100644 index 00000000..f33f397a --- /dev/null +++ b/agentops/llms/instrumented_provider.py @@ -0,0 +1,38 @@ +from abc import ABC, abstractmethod +from typing import Optional + +from ..session import Session +from ..event import LLMEvent + + +class InstrumentedProvider(ABC): + _provider_name: str = "InstrumentedModel" + llm_event: Optional[LLMEvent] = None + client = None + + def __init__(self, client): + self.client = client + + @abstractmethod + def handle_response( + self, response, kwargs, init_timestamp, session: Optional[Session] = None + ) -> dict: + pass + + @abstractmethod + def override(self): + pass + + @abstractmethod + def undo_override(self): + pass + + @property + def provider_name(self): + return self._provider_name + + def _safe_record(self, session, event): + if session is not None: + session.record(event) + else: + self.client.record(event) diff --git a/agentops/llms/litellm.py b/agentops/llms/litellm.py new file mode 100644 index 00000000..39836c10 --- /dev/null +++ b/agentops/llms/litellm.py @@ -0,0 +1,241 @@ +import pprint +from typing import Optional + +from ..log_config import logger +from ..event import LLMEvent, ErrorEvent +from ..session import Session +from agentops.helpers import get_ISO_time, check_call_stack_for_agent_id +from agentops.llms.instrumented_provider import InstrumentedProvider +from agentops.time_travel import fetch_completion_override_from_time_travel_cache +from ..singleton import singleton + + +@singleton +class LiteLLMProvider(InstrumentedProvider): + original_create = None + original_create_async = None + original_oai_create = None + original_oai_create_async = None + + def __init__(self, client): + super().__init__(client) + + def override(self): + self._override_async_completion() + self._override_completion() + + def undo_override(self): + import litellm + from openai.resources.chat import completions + + litellm.acompletion = self.original_create_async + litellm.completion = self.original_create + + completions.Completions.create = self.original_oai_create + completions.AsyncCompletions.create = self.original_oai_create_async + + def handle_response( + self, response, kwargs, init_timestamp, session: Optional[Session] = None + ) -> dict: + """Handle responses for OpenAI versions >v1.0.0""" + from openai import AsyncStream, Stream + from openai.resources import AsyncCompletions + from openai.types.chat import ChatCompletionChunk + from litellm.utils import CustomStreamWrapper + + self.llm_event = LLMEvent(init_timestamp=init_timestamp, params=kwargs) + if session is not None: + self.llm_event.session_id = session.session_id + + def handle_stream_chunk(chunk: ChatCompletionChunk): + # NOTE: prompt/completion usage not returned in response when streaming + # We take the first ChatCompletionChunk and accumulate the deltas from all subsequent chunks to build one full chat completion + if self.llm_event.returns == None: + self.llm_event.returns = chunk + + try: + accumulated_delta = self.llm_event.returns.choices[0].delta + self.llm_event.agent_id = check_call_stack_for_agent_id() + self.llm_event.model = chunk.model + self.llm_event.prompt = kwargs["messages"] + + # NOTE: We assume for completion only choices[0] is relevant + choice = chunk.choices[0] + + if choice.delta.content: + accumulated_delta.content += choice.delta.content + + if choice.delta.role: + accumulated_delta.role = choice.delta.role + + if choice.delta.tool_calls: + accumulated_delta.tool_calls = choice.delta.tool_calls + + if choice.delta.function_call: + accumulated_delta.function_call = choice.delta.function_call + + if choice.finish_reason: + # Streaming is done. Record LLMEvent + self.llm_event.returns.choices[0].finish_reason = ( + choice.finish_reason + ) + self.llm_event.completion = { + "role": accumulated_delta.role, + "content": accumulated_delta.content, + "function_call": accumulated_delta.function_call, + "tool_calls": accumulated_delta.tool_calls, + } + self.llm_event.end_timestamp = get_ISO_time() + + self._safe_record(session, self.llm_event) + except Exception as e: + self._safe_record( + session, ErrorEvent(trigger_event=self.llm_event, exception=e) + ) + + kwargs_str = pprint.pformat(kwargs) + chunk = pprint.pformat(chunk) + logger.warning( + f"Unable to parse a chunk for LLM call. Skipping upload to AgentOps\n" + f"chunk:\n {chunk}\n" + f"kwargs:\n {kwargs_str}\n" + ) + + # if the response is a generator, decorate the generator + if isinstance(response, Stream): + + def generator(): + for chunk in response: + handle_stream_chunk(chunk) + yield chunk + + return generator() + + # litellm uses a CustomStreamWrapper + if isinstance(response, CustomStreamWrapper): + + def generator(): + for chunk in response: + handle_stream_chunk(chunk) + yield chunk + + return generator() + + # For asynchronous AsyncStream + elif isinstance(response, AsyncStream): + + async def async_generator(): + async for chunk in response: + handle_stream_chunk(chunk) + yield chunk + + return async_generator() + + # For async AsyncCompletion + elif isinstance(response, AsyncCompletions): + + async def async_generator(): + async for chunk in response: + handle_stream_chunk(chunk) + yield chunk + + return async_generator() + + # v1.0.0+ responses are objects + try: + self.llm_event.returns = response + self.llm_event.agent_id = check_call_stack_for_agent_id() + self.llm_event.prompt = kwargs["messages"] + self.llm_event.prompt_tokens = response.usage.prompt_tokens + self.llm_event.completion = response.choices[0].message.model_dump() + self.llm_event.completion_tokens = response.usage.completion_tokens + self.llm_event.model = response.model + + self._safe_record(session, self.llm_event) + except Exception as e: + self._safe_record( + session, ErrorEvent(trigger_event=self.llm_event, exception=e) + ) + + kwargs_str = pprint.pformat(kwargs) + response = pprint.pformat(response) + logger.warning( + f"Unable to parse response for LLM call. Skipping upload to AgentOps\n" + f"response:\n {response}\n" + f"kwargs:\n {kwargs_str}\n" + ) + + return response + + def _override_completion(self): + import litellm + from openai.types.chat import ( + ChatCompletion, + ) # Note: litellm calls all LLM APIs using the OpenAI format + from openai.resources.chat import completions + + self.original_create = litellm.completion + self.original_oai_create = completions.Completions.create + + def patched_function(*args, **kwargs): + init_timestamp = get_ISO_time() + + session = kwargs.get("session", None) + if "session" in kwargs.keys(): + del kwargs["session"] + + completion_override = fetch_completion_override_from_time_travel_cache( + kwargs + ) + if completion_override: + result_model = ChatCompletion.model_validate_json(completion_override) + return self.handle_response( + result_model, kwargs, init_timestamp, session=session + ) + + # prompt_override = fetch_prompt_override_from_time_travel_cache(kwargs) + # if prompt_override: + # kwargs["messages"] = prompt_override["messages"] + + # Call the original function with its original arguments + result = self.original_create(*args, **kwargs) + return self.handle_response(result, kwargs, init_timestamp, session=session) + + litellm.completion = patched_function + + def _override_async_completion(self): + import litellm + from openai.types.chat import ( + ChatCompletion, + ) # Note: litellm calls all LLM APIs using the OpenAI format + from openai.resources.chat import completions + + self.original_create_async = litellm.acompletion + self.original_oai_create_async = completions.AsyncCompletions.create + + async def patched_function(*args, **kwargs): + init_timestamp = get_ISO_time() + + session = kwargs.get("session", None) + if "session" in kwargs.keys(): + del kwargs["session"] + + completion_override = fetch_completion_override_from_time_travel_cache( + kwargs + ) + if completion_override: + result_model = ChatCompletion.model_validate_json(completion_override) + return self.handle_response( + result_model, kwargs, init_timestamp, session=session + ) + + # prompt_override = fetch_prompt_override_from_time_travel_cache(kwargs) + # if prompt_override: + # kwargs["messages"] = prompt_override["messages"] + + # Call the original function with its original arguments + result = await self.original_create_async(*args, **kwargs) + return self.handle_response(result, kwargs, init_timestamp, session=session) + + # Override the original method with the patched one + litellm.acompletion = patched_function diff --git a/agentops/llms/ollama.py b/agentops/llms/ollama.py new file mode 100644 index 00000000..6b3a8f84 --- /dev/null +++ b/agentops/llms/ollama.py @@ -0,0 +1,120 @@ +import inspect +import sys +from typing import Optional + +from ..event import LLMEvent +from ..session import Session +from agentops.helpers import get_ISO_time, check_call_stack_for_agent_id +from .instrumented_provider import InstrumentedProvider +from ..singleton import singleton + +original_func = {} + + +@singleton +class OllamaProvider(InstrumentedProvider): + original_create = None + original_create_async = None + + def handle_response( + self, response, kwargs, init_timestamp, session: Optional[Session] = None + ) -> dict: + self.llm_event = LLMEvent(init_timestamp=init_timestamp, params=kwargs) + + def handle_stream_chunk(chunk: dict): + message = chunk.get("message", {"role": None, "content": ""}) + + if chunk.get("done"): + self.llm_event.completion["content"] += message.get("content") + self.llm_event.end_timestamp = get_ISO_time() + self.llm_event.model = f'ollama/{chunk.get("model")}' + self.llm_event.returns = chunk + self.llm_event.returns["message"] = self.llm_event.completion + self.llm_event.prompt = kwargs["messages"] + self.llm_event.agent_id = check_call_stack_for_agent_id() + self.client.record(self.llm_event) + + if self.llm_event.completion is None: + self.llm_event.completion = message + else: + self.llm_event.completion["content"] += message.get("content") + + if inspect.isgenerator(response): + + def generator(): + for chunk in response: + handle_stream_chunk(chunk) + yield chunk + + return generator() + + self.llm_event.end_timestamp = get_ISO_time() + + self.llm_event.model = f'ollama/{response["model"]}' + self.llm_event.returns = response + self.llm_event.agent_id = check_call_stack_for_agent_id() + self.llm_event.prompt = kwargs["messages"] + self.llm_event.completion = response["message"] + + self._safe_record(session, self.llm_event) + return response + + def override(self): + self._override_chat_client() + self._override_chat() + self._override_chat_async_client() + + def undo_override(self): + if "ollama" in sys.modules: + import ollama + + ollama.chat = original_func["ollama.chat"] + ollama.Client.chat = original_func["ollama.Client.chat"] + ollama.AsyncClient.chat = original_func["ollama.AsyncClient.chat"] + + def __init__(self, client): + super().__init__(client) + + def _override_chat(self): + import ollama + + original_func["ollama.chat"] = ollama.chat + + def patched_function(*args, **kwargs): + # Call the original function with its original arguments + init_timestamp = get_ISO_time() + result = original_func["ollama.chat"](*args, **kwargs) + return self.handle_response( + result, kwargs, init_timestamp, session=kwargs.get("session", None) + ) + + # Override the original method with the patched one + ollama.chat = patched_function + + def _override_chat_client(self): + from ollama import Client + + original_func["ollama.Client.chat"] = Client.chat + + def patched_function(*args, **kwargs): + # Call the original function with its original arguments + init_timestamp = get_ISO_time() + result = original_func["ollama.Client.chat"](*args, **kwargs) + return self.handle_response(result, kwargs, init_timestamp) + + # Override the original method with the patched one + Client.chat = patched_function + + def _override_chat_async_client(self): + from ollama import AsyncClient + + original_func["ollama.AsyncClient.chat"] = AsyncClient.chat + + async def patched_function(*args, **kwargs): + # Call the original function with its original arguments + init_timestamp = get_ISO_time() + result = await original_func["ollama.AsyncClient.chat"](*args, **kwargs) + return self.handle_response(result, kwargs, init_timestamp) + + # Override the original method with the patched one + AsyncClient.chat = patched_function diff --git a/agentops/llms/openai.py b/agentops/llms/openai.py new file mode 100644 index 00000000..fc16ab01 --- /dev/null +++ b/agentops/llms/openai.py @@ -0,0 +1,257 @@ +import inspect +import pprint +from typing import Optional + +from agentops.llms.instrumented_provider import InstrumentedProvider +from agentops.time_travel import fetch_completion_override_from_time_travel_cache + +from ..event import ActionEvent, ErrorEvent, LLMEvent +from ..session import Session +from ..log_config import logger +from ..helpers import check_call_stack_for_agent_id, get_ISO_time +from ..singleton import singleton + + +@singleton +class OpenAiProvider(InstrumentedProvider): + + original_create = None + original_create_async = None + + def __init__(self, client): + super().__init__(client) + self._provider_name = "OpenAI" + + def handle_response( + self, response, kwargs, init_timestamp, session: Optional[Session] = None + ) -> dict: + """Handle responses for OpenAI versions >v1.0.0""" + from openai import AsyncStream, Stream + from openai.resources import AsyncCompletions + from openai.types.chat import ChatCompletionChunk + + self.llm_event = LLMEvent(init_timestamp=init_timestamp, params=kwargs) + if session is not None: + self.llm_event.session_id = session.session_id + + def handle_stream_chunk(chunk: ChatCompletionChunk): + # NOTE: prompt/completion usage not returned in response when streaming + # We take the first ChatCompletionChunk and accumulate the deltas from all subsequent chunks to build one full chat completion + if self.llm_event.returns == None: + self.llm_event.returns = chunk + + try: + accumulated_delta = self.llm_event.returns.choices[0].delta + self.llm_event.agent_id = check_call_stack_for_agent_id() + self.llm_event.model = chunk.model + self.llm_event.prompt = kwargs["messages"] + + # NOTE: We assume for completion only choices[0] is relevant + choice = chunk.choices[0] + + if choice.delta.content: + accumulated_delta.content += choice.delta.content + + if choice.delta.role: + accumulated_delta.role = choice.delta.role + + if choice.delta.tool_calls: + accumulated_delta.tool_calls = choice.delta.tool_calls + + if choice.delta.function_call: + accumulated_delta.function_call = choice.delta.function_call + + if choice.finish_reason: + # Streaming is done. Record LLMEvent + self.llm_event.returns.choices[0].finish_reason = ( + choice.finish_reason + ) + self.llm_event.completion = { + "role": accumulated_delta.role, + "content": accumulated_delta.content, + "function_call": accumulated_delta.function_call, + "tool_calls": accumulated_delta.tool_calls, + } + self.llm_event.end_timestamp = get_ISO_time() + + self._safe_record(session, self.llm_event) + except Exception as e: + self._safe_record( + session, ErrorEvent(trigger_event=self.llm_event, exception=e) + ) + + kwargs_str = pprint.pformat(kwargs) + chunk = pprint.pformat(chunk) + logger.warning( + f"Unable to parse a chunk for LLM call. Skipping upload to AgentOps\n" + f"chunk:\n {chunk}\n" + f"kwargs:\n {kwargs_str}\n" + ) + + # if the response is a generator, decorate the generator + if isinstance(response, Stream): + + def generator(): + for chunk in response: + handle_stream_chunk(chunk) + yield chunk + + return generator() + + # For asynchronous AsyncStream + elif isinstance(response, AsyncStream): + + async def async_generator(): + async for chunk in response: + handle_stream_chunk(chunk) + yield chunk + + return async_generator() + + # For async AsyncCompletion + elif isinstance(response, AsyncCompletions): + + async def async_generator(): + async for chunk in response: + handle_stream_chunk(chunk) + yield chunk + + return async_generator() + + # v1.0.0+ responses are objects + try: + self.llm_event.returns = response + self.llm_event.agent_id = check_call_stack_for_agent_id() + self.llm_event.prompt = kwargs["messages"] + self.llm_event.prompt_tokens = response.usage.prompt_tokens + self.llm_event.completion = response.choices[0].message.model_dump() + self.llm_event.completion_tokens = response.usage.completion_tokens + self.llm_event.model = response.model + + self._safe_record(session, self.llm_event) + except Exception as e: + self._safe_record( + session, ErrorEvent(trigger_event=self.llm_event, exception=e) + ) + + kwargs_str = pprint.pformat(kwargs) + response = pprint.pformat(response) + logger.warning( + f"Unable to parse response for LLM call. Skipping upload to AgentOps\n" + f"response:\n {response}\n" + f"kwargs:\n {kwargs_str}\n" + ) + + return response + + def override(self): + self._override_openai_v1_completion() + self._override_openai_v1_async_completion() + + def _override_openai_v1_completion(self): + from openai.resources.chat import completions + from openai.types.chat import ChatCompletion, ChatCompletionChunk + + # Store the original method + self.original_create = completions.Completions.create + + def patched_function(*args, **kwargs): + init_timestamp = get_ISO_time() + session = kwargs.get("session", None) + if "session" in kwargs.keys(): + del kwargs["session"] + + completion_override = fetch_completion_override_from_time_travel_cache( + kwargs + ) + if completion_override: + result_model = None + pydantic_models = (ChatCompletion, ChatCompletionChunk) + for pydantic_model in pydantic_models: + try: + result_model = pydantic_model.model_validate_json( + completion_override + ) + break + except Exception as e: + pass + + if result_model is None: + logger.error( + f"Time Travel: Pydantic validation failed for {pydantic_models} \n" + f"Time Travel: Completion override was:\n" + f"{pprint.pformat(completion_override)}" + ) + return None + return self.handle_response( + result_model, kwargs, init_timestamp, session=session + ) + + # prompt_override = fetch_prompt_override_from_time_travel_cache(kwargs) + # if prompt_override: + # kwargs["messages"] = prompt_override["messages"] + + # Call the original function with its original arguments + result = self.original_create(*args, **kwargs) + return self.handle_response(result, kwargs, init_timestamp, session=session) + + # Override the original method with the patched one + completions.Completions.create = patched_function + + def _override_openai_v1_async_completion(self): + from openai.resources.chat import completions + from openai.types.chat import ChatCompletion, ChatCompletionChunk + + # Store the original method + self.original_create_async = completions.AsyncCompletions.create + + async def patched_function(*args, **kwargs): + + init_timestamp = get_ISO_time() + + session = kwargs.get("session", None) + if "session" in kwargs.keys(): + del kwargs["session"] + + completion_override = fetch_completion_override_from_time_travel_cache( + kwargs + ) + if completion_override: + result_model = None + pydantic_models = (ChatCompletion, ChatCompletionChunk) + for pydantic_model in pydantic_models: + try: + result_model = pydantic_model.model_validate_json( + completion_override + ) + break + except Exception as e: + pass + + if result_model is None: + logger.error( + f"Time Travel: Pydantic validation failed for {pydantic_models} \n" + f"Time Travel: Completion override was:\n" + f"{pprint.pformat(completion_override)}" + ) + return None + return self.handle_response( + result_model, kwargs, init_timestamp, session=session + ) + + # prompt_override = fetch_prompt_override_from_time_travel_cache(kwargs) + # if prompt_override: + # kwargs["messages"] = prompt_override["messages"] + + # Call the original function with its original arguments + result = await original_create_async(*args, **kwargs) + return self.handle_response(result, kwargs, init_timestamp, session=session) + + # Override the original method with the patched one + completions.AsyncCompletions.create = patched_function + + def undo_override(self): + from openai.resources.chat import completions + + completions.AsyncCompletions.create = self.original_create_async + completions.Completions.create = self.original_create diff --git a/agentops/singleton.py b/agentops/singleton.py new file mode 100644 index 00000000..8200510e --- /dev/null +++ b/agentops/singleton.py @@ -0,0 +1,30 @@ +ao_instances = {} + + +def singleton(class_): + + def getinstance(*args, **kwargs): + if class_ not in ao_instances: + ao_instances[class_] = class_(*args, **kwargs) + return ao_instances[class_] + + return getinstance + + +def conditional_singleton(class_): + + def getinstance(*args, **kwargs): + use_singleton = kwargs.pop("use_singleton", True) + if use_singleton: + if class_ not in ao_instances: + ao_instances[class_] = class_(*args, **kwargs) + return ao_instances[class_] + else: + return class_(*args, **kwargs) + + return getinstance + + +def clear_singletons(): + global ao_instances + ao_instances = {} diff --git a/agentops/time_travel.py b/agentops/time_travel.py index caf7887c..14e8b2af 100644 --- a/agentops/time_travel.py +++ b/agentops/time_travel.py @@ -3,7 +3,7 @@ import os from .http_client import HttpClient from .exceptions import ApiServerException -from .helpers import singleton +from .singleton import singleton @singleton diff --git a/docs/v1/usage/tracking-llm-calls.mdx b/docs/v1/usage/tracking-llm-calls.mdx index d51137ac..9cb93ab8 100644 --- a/docs/v1/usage/tracking-llm-calls.mdx +++ b/docs/v1/usage/tracking-llm-calls.mdx @@ -5,8 +5,8 @@ description: "Tracking LLM Calls using the AgentOps SDK" ### How it works -When the AgentOps SDK detects the `openai`, `litellm`, or `cohere` as installed modules, it will automatically -start tracking their usage. No further work is required from you! ๐Ÿ˜Š +When the AgentOps SDK detects a supported LLM provider module installed, it will automatically +start tracking its usage. No further work is required from you! ๐Ÿ˜Š ### Not working? @@ -28,6 +28,14 @@ To get started, just follow the quick start guide. +### Stop Tracking LLM Calls + +To stop tracking LLM calls after running `agentops.init()`, you can call `agentops.stop_instrumenting()`. + +This function reverts the changes made to your LLM Provider's module, removing AgentOps instrumentation. + +_Special consideration for Cohere: Calling `stop_instrumenting()` has no effect on previously instantiated Cohere clients. You must create a new Cohere client after calling this function._ + diff --git a/examples/README.md b/examples/README.md index 62c35dd3..7bc75c15 100644 --- a/examples/README.md +++ b/examples/README.md @@ -8,4 +8,6 @@ ## Integrations - [Using Langchain](./langchain_examples.ipynb) - [Crew.ai](https://github.com/joaomdmoura/crewAI-examples/tree/main/markdown_validator) - - Crew is a framework for developing agents, a number of their example projects use AgentOps \ No newline at end of file + - Crew is a framework for developing agents, a number of their example projects use AgentOps +- [Cohere](./cohere_example.ipynb) +- [Anthropic](./anthropic_example.ipynb) \ No newline at end of file diff --git a/examples/anthropic/anthropic_example.ipynb b/examples/anthropic/anthropic_example.ipynb new file mode 100644 index 00000000..3f287621 --- /dev/null +++ b/examples/anthropic/anthropic_example.ipynb @@ -0,0 +1,235 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Anthropic Example" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "First let's install the required packages" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%pip install -U anthropic\n", + "%pip install -U agentops" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Then import them" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from anthropic import Anthropic, AsyncAnthropic\n", + "import agentops\n", + "import os\n", + "from dotenv import load_dotenv" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Next, we'll grab our API keys. You can use dotenv like below or however else you like to load environment variables" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "load_dotenv()\n", + "ANTHROPIC_API_KEY = os.getenv(\"ANTHROPIC_API_KEY\") or \"\"\n", + "AGENTOPS_API_KEY = os.getenv(\"AGENTOPS_API_KEY\") or \"\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "agentops.init(AGENTOPS_API_KEY, default_tags=[\"anthropic-example\"])" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Sync Example\n", + "We will demonstrate a basic sync call to Anthropic using the Claude 3.5 Sonnet model." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "client = Anthropic()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "stream = client.messages.create(\n", + " max_tokens=1024,\n", + " model=\"claude-3-5-sonnet-20240620\",\n", + " messages=[\n", + " {\n", + " \"role\": \"user\",\n", + " \"content\": \"What is 2+2?\",\n", + " }\n", + " ],\n", + " stream=True,\n", + ")\n", + "\n", + "response = \"\"\n", + "for event in stream:\n", + " if event.type == \"content_block_delta\":\n", + " response += event.delta.text\n", + " elif event.type == \"message_stop\":\n", + " print(response)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Async Example\n", + "The async example is very similar to the sync example, but it uses the AsyncAnthropic client from the anthropic library." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "aclient = AsyncAnthropic()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "async def main() -> None:\n", + " message = await aclient.messages.create(\n", + " max_tokens=1024,\n", + " messages=[\n", + " {\n", + " \"role\": \"user\",\n", + " \"content\": \"Explain everything you know about the Leibniz Equation.\",\n", + " }\n", + " ],\n", + " model=\"claude-3-5-sonnet-20240620\",\n", + " )\n", + " print(message.content[0].text)\n", + "\n", + "\n", + "await main()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Tool Calling Example\n", + "Anthropic models support tool calling, which allows the model to call external APIs." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "stream = client.messages.create(\n", + " max_tokens=1024,\n", + " model=\"claude-3-5-sonnet-20240620\",\n", + " tools=[\n", + " {\n", + " \"name\": \"web_search\",\n", + " \"description\": \"Search the web for information\",\n", + " \"input_schema\": {\n", + " \"type\": \"object\",\n", + " \"properties\": {\n", + " \"query\": {\"type\": \"string\", \"description\": \"The search query\"}\n", + " },\n", + " \"required\": [\"query\"],\n", + " },\n", + " }\n", + " ],\n", + " messages=[\n", + " {\n", + " \"role\": \"user\",\n", + " \"content\": \"Tell me everything you can about AgentOps\",\n", + " }\n", + " ],\n", + " stream=True,\n", + ")\n", + "\n", + "response = \"\"\n", + "for event in stream:\n", + " if event.type == \"content_block_delta\":\n", + " if event.delta.type == \"text\":\n", + " response += event.delta.text\n", + " elif event.type == \"message_stop\":\n", + " print(response)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "agentops.end_session(\"Success\")" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "ops", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.19" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/tests/core_manual_tests/canary.py b/tests/core_manual_tests/canary.py index 325cb840..72e2e357 100644 --- a/tests/core_manual_tests/canary.py +++ b/tests/core_manual_tests/canary.py @@ -4,7 +4,7 @@ from agentops import ActionEvent load_dotenv() -agentops.init() +agentops.init(default_tags=["canary"]) openai = OpenAI() messages = [{"role": "user", "content": "Hello"}] diff --git a/tests/core_manual_tests/providers/anthropic_canary.py b/tests/core_manual_tests/providers/anthropic_canary.py new file mode 100644 index 00000000..bc505606 --- /dev/null +++ b/tests/core_manual_tests/providers/anthropic_canary.py @@ -0,0 +1,78 @@ +import asyncio + +import agentops +from dotenv import load_dotenv +import anthropic + +load_dotenv() +agentops.init(default_tags=["anthropic-provider-test"]) +anthropic_client = anthropic.Anthropic() +async_anthropic_client = anthropic.AsyncAnthropic() + +response = anthropic_client.messages.create( + max_tokens=1024, + model="claude-3-5-sonnet-20240620", + messages=[ + { + "role": "user", + "content": "say hi", + } + ], +) + + +stream_response = anthropic_client.messages.create( + max_tokens=1024, + model="claude-3-5-sonnet-20240620", + messages=[ + { + "role": "user", + "content": "asy hi 2", + } + ], + stream=True, +) + +response = "" +for event in stream_response: + if event.type == "content_block_delta": + response += event.delta.text + elif event.type == "message_stop": + print(response) + + +async def async_test(): + async_response = await async_anthropic_client.messages.create( + max_tokens=1024, + model="claude-3-5-sonnet-20240620", + messages=[ + { + "role": "user", + "content": "say hi 3", + } + ], + ) + print(async_response) + + +asyncio.run(async_test()) + +agentops.stop_instrumenting() + +untracked_response = anthropic_client.messages.create( + max_tokens=1024, + model="claude-3-5-sonnet-20240620", + messages=[ + { + "role": "user", + "content": "say hi 4", + } + ], +) + + +agentops.end_session(end_state="Success") + +### +# Used to verify that one session is created with one LLM event +### diff --git a/tests/core_manual_tests/providers/cohere_canary.py b/tests/core_manual_tests/providers/cohere_canary.py new file mode 100644 index 00000000..fcec2006 --- /dev/null +++ b/tests/core_manual_tests/providers/cohere_canary.py @@ -0,0 +1,45 @@ +import asyncio + +import agentops +from dotenv import load_dotenv +import cohere + +load_dotenv() +agentops.init(default_tags=["cohere-provider-test"]) +co = cohere.Client() +aco = cohere.AsyncClient() + +chat = co.chat(message="say hi 1") + +stream = co.chat_stream(message="say hi 2") +response = "" +for event in stream: + if event.event_type == "text-generation": + response += event.text + print(event.text, end="") + elif event.event_type == "stream-end": + print("\n") + print(event) + print("\n") + + +async def main(): + async_response = await aco.chat(message="say hi 3") + print(async_response) + + +asyncio.run(main()) + +agentops.stop_instrumenting() + +# for cohere, because the client is not a singleton, calling `stop_instrumenting()` only affects +# new clients created, not existing clients. + +co_untracked = cohere.Client() +chat_untracked = co_untracked.chat(message="say hi untracked") + +agentops.end_session(end_state="Success") + +### +# Used to verify that one session is created with one LLM event +### diff --git a/tests/core_manual_tests/providers/groq_canary.py b/tests/core_manual_tests/providers/groq_canary.py new file mode 100644 index 00000000..1d91f33c --- /dev/null +++ b/tests/core_manual_tests/providers/groq_canary.py @@ -0,0 +1,60 @@ +import asyncio + +import agentops +from dotenv import load_dotenv +import os +from groq import Groq, AsyncGroq + +load_dotenv() +agentops.init(default_tags=["groq-provider-test"]) +groq_client = Groq(api_key=os.getenv("GROQ_API_KEY")) +async_groq_client = AsyncGroq(api_key=os.getenv("GROQ_API_KEY")) + +messages = [{"role": "user", "content": "Hello"}] + +# option 1: use session.patch +res = groq_client.chat.completions.create( + model="llama3-70b-8192", + messages=[ + {"role": "user", "content": "Say hello"}, + ], +) + +stream_res = groq_client.chat.completions.create( + model="llama3-70b-8192", + messages=[ + {"role": "user", "content": "Say hello"}, + ], + stream=True, +) + +for chunk in stream_res: + print(chunk) + + +async def async_test(): + async_res = await async_groq_client.chat.completions.create( + model="llama3-70b-8192", + messages=[ + {"role": "user", "content": "Say hello"}, + ], + ) + print(async_res) + + +asyncio.run(async_test()) + +agentops.stop_instrumenting() + +untracked_res = groq_client.chat.completions.create( + model="llama3-70b-8192", + messages=[ + {"role": "user", "content": "Say hello"}, + ], +) + +agentops.end_session(end_state="Success") + +### +# Used to verify that one session is created with one LLM event +### diff --git a/tests/core_manual_tests/providers/litellm_canary.py b/tests/core_manual_tests/providers/litellm_canary.py new file mode 100644 index 00000000..ecd2186a --- /dev/null +++ b/tests/core_manual_tests/providers/litellm_canary.py @@ -0,0 +1,44 @@ +import asyncio + +import agentops +from dotenv import load_dotenv +import litellm + +load_dotenv() +agentops.init(default_tags=["litellm-provider-test"]) + +response = litellm.completion( + model="gpt-3.5-turbo", messages=[{"content": "Hello, how are you?", "role": "user"}] +) + +stream_response = litellm.completion( + model="gpt-3.5-turbo", + messages=[{"content": "Hello, how are you?", "role": "user"}], + stream=True, +) +print(stream_response) +for chunk in stream_response: + print(chunk) + + +async def main(): + async_response = await litellm.acompletion( + model="gpt-3.5-turbo", + messages=[{"content": "Hello, how are you?", "role": "user"}], + ) + print(async_response) + + +asyncio.run(main()) + +agentops.stop_instrumenting() + +untracked_response = litellm.completion( + model="gpt-3.5-turbo", messages=[{"content": "Hello, how are you?", "role": "user"}] +) + +agentops.end_session(end_state="Success") + +### +# Used to verify that one session is created with one LLM event +### diff --git a/tests/core_manual_tests/providers/ollama_canary.py b/tests/core_manual_tests/providers/ollama_canary.py new file mode 100644 index 00000000..667a1f33 --- /dev/null +++ b/tests/core_manual_tests/providers/ollama_canary.py @@ -0,0 +1,58 @@ +import asyncio + +import agentops +from dotenv import load_dotenv +import ollama +from ollama import AsyncClient + +load_dotenv() +agentops.init(default_tags=["ollama-provider-test"]) + +response = ollama.chat( + model="llama3.1", + messages=[ + { + "role": "user", + "content": "say hello sync", + }, + ], +) + +stream_response = ollama.chat( + model="llama3.1", + messages=[ + { + "role": "user", + "content": "say hello str", + }, + ], + stream=True, +) +for chunk in stream_response: + print(chunk) + + +async def main(): + message = {"role": "user", "content": "say hello mr. async"} + async_response = await AsyncClient().chat(model="llama3.1", messages=[message]) + + +asyncio.run(main()) + +agentops.stop_instrumenting() + +untracked_response = ollama.chat( + model="llama3.1", + messages=[ + { + "role": "user", + "content": "say hello", + }, + ], +) + +agentops.end_session(end_state="Success") + +### +# Used to verify that one session is created with one LLM event +### diff --git a/tests/core_manual_tests/providers/openai_canary.py b/tests/core_manual_tests/providers/openai_canary.py new file mode 100644 index 00000000..3244109d --- /dev/null +++ b/tests/core_manual_tests/providers/openai_canary.py @@ -0,0 +1,48 @@ +import agentops +import asyncio +from openai import OpenAI, AsyncOpenAI +from dotenv import load_dotenv +from agentops import ActionEvent + +load_dotenv() +agentops.init(default_tags=["openai-v1-provider-test"]) +openai = OpenAI() +async_openai = OpenAI() + + +# option 1: use session.patch +response = openai.chat.completions.create( + model="gpt-3.5-turbo", + messages=[{"role": "user", "content": "Hello"}], + temperature=0.5, +) + +stream_response = openai.chat.completions.create( + model="gpt-3.5-turbo", + messages=[{"role": "user", "content": "Hello streamed"}], + temperature=0.5, + stream=True, +) + +for chunk in stream_response: + print(chunk) + +async_response = async_openai.chat.completions.create( + model="gpt-3.5-turbo", + messages=[{"role": "user", "content": "Hello async"}], + temperature=0.5, +) + +agentops.stop_instrumenting() + +not_tracked_response = openai.chat.completions.create( + model="gpt-3.5-turbo", + messages=[{"role": "user", "content": "Hello untracked"}], + temperature=0.5, +) + +agentops.end_session(end_state="Success") + +### +# Used to verify that one session is created with two LLM events +### diff --git a/tests/test_agent.py b/tests/test_agent.py index abf7b12a..408161b9 100644 --- a/tests/test_agent.py +++ b/tests/test_agent.py @@ -1,6 +1,4 @@ from unittest import TestCase -from unittest.mock import patch, MagicMock -from uuid import uuid4 from agentops import track_agent import agentops diff --git a/tests/test_canary.py b/tests/test_canary.py index a89cf344..8ff49a01 100644 --- a/tests/test_canary.py +++ b/tests/test_canary.py @@ -3,7 +3,7 @@ import time import agentops from agentops import ActionEvent -from agentops.helpers import clear_singletons +from agentops.singleton import clear_singletons @pytest.fixture(autouse=True) diff --git a/tests/test_events.py b/tests/test_events.py index 5234f3fa..a2d70d75 100644 --- a/tests/test_events.py +++ b/tests/test_events.py @@ -3,7 +3,7 @@ import pytest import agentops from agentops import ActionEvent, ErrorEvent -from agentops.helpers import clear_singletons +from agentops.singleton import clear_singletons @pytest.fixture(autouse=True) diff --git a/tests/test_record_action.py b/tests/test_record_action.py index d3258583..bc433c55 100644 --- a/tests/test_record_action.py +++ b/tests/test_record_action.py @@ -4,7 +4,7 @@ import agentops from agentops import record_action from datetime import datetime -from agentops.helpers import clear_singletons +from agentops.singleton import clear_singletons import contextlib jwts = ["some_jwt", "some_jwt2", "some_jwt3"] diff --git a/tests/test_record_tool.py b/tests/test_record_tool.py index c9f44471..b290e131 100644 --- a/tests/test_record_tool.py +++ b/tests/test_record_tool.py @@ -5,7 +5,7 @@ from agentops import record_tool from datetime import datetime -from agentops.helpers import clear_singletons +from agentops.singleton import clear_singletons import contextlib jwts = ["some_jwt", "some_jwt2", "some_jwt3"] diff --git a/tests/test_session.py b/tests/test_session.py index f5462085..075aca84 100644 --- a/tests/test_session.py +++ b/tests/test_session.py @@ -3,8 +3,7 @@ import time import agentops from agentops import ActionEvent, Client -from agentops.exceptions import NoSessionException, MultiSessionException -from agentops.helpers import clear_singletons +from agentops.singleton import clear_singletons @pytest.fixture(autouse=True) diff --git a/tests/test_singleton.py b/tests/test_singleton.py index ff936c30..ca479090 100644 --- a/tests/test_singleton.py +++ b/tests/test_singleton.py @@ -1,8 +1,6 @@ import uuid -import pytest - -from agentops.helpers import singleton, conditional_singleton, clear_singletons +from agentops.singleton import singleton, conditional_singleton, clear_singletons @singleton