diff --git a/agentops/llms/__init__.py b/agentops/llms/__init__.py index e3e6f7cf..8c7ba5f4 100644 --- a/agentops/llms/__init__.py +++ b/agentops/llms/__init__.py @@ -43,7 +43,6 @@ class LlmTracker: def __init__(self, client): self.client = client - self.completion = "" def override_api(self): """ diff --git a/agentops/llms/anthropic.py b/agentops/llms/anthropic.py index 322d2181..e0e78891 100644 --- a/agentops/llms/anthropic.py +++ b/agentops/llms/anthropic.py @@ -31,27 +31,27 @@ def handle_response( from anthropic.resources import AsyncMessages from anthropic.types import Message - self.llm_event = LLMEvent(init_timestamp=init_timestamp, params=kwargs) + llm_event = LLMEvent(init_timestamp=init_timestamp, params=kwargs) if session is not None: - self.llm_event.session_id = session.session_id + llm_event.session_id = session.session_id def handle_stream_chunk(chunk: Message): try: # We take the first chunk and accumulate the deltas from all subsequent chunks to build one full chat completion if chunk.type == "message_start": - self.llm_event.returns = chunk - self.llm_event.agent_id = check_call_stack_for_agent_id() - self.llm_event.model = kwargs["model"] - self.llm_event.prompt = kwargs["messages"] - self.llm_event.prompt_tokens = chunk.message.usage.input_tokens - self.llm_event.completion = { + llm_event.returns = chunk + llm_event.agent_id = check_call_stack_for_agent_id() + llm_event.model = kwargs["model"] + llm_event.prompt = kwargs["messages"] + llm_event.prompt_tokens = chunk.message.usage.input_tokens + llm_event.completion = { "role": chunk.message.role, "content": "", # Always returned as [] in this instance type } elif chunk.type == "content_block_start": if chunk.content_block.type == "text": - self.llm_event.completion["content"] += chunk.content_block.text + llm_event.completion["content"] += chunk.content_block.text elif chunk.content_block.type == "tool_use": self.tool_id = chunk.content_block.id @@ -62,7 +62,7 @@ def handle_stream_chunk(chunk: Message): elif chunk.type == "content_block_delta": if chunk.delta.type == "text_delta": - self.llm_event.completion["content"] += chunk.delta.text + llm_event.completion["content"] += chunk.delta.text elif chunk.delta.type == "input_json_delta": self.tool_event[self.tool_id].logs[ @@ -73,15 +73,15 @@ def handle_stream_chunk(chunk: Message): pass elif chunk.type == "message_delta": - self.llm_event.completion_tokens = chunk.usage.output_tokens + llm_event.completion_tokens = chunk.usage.output_tokens elif chunk.type == "message_stop": - self.llm_event.end_timestamp = get_ISO_time() - self._safe_record(session, self.llm_event) + llm_event.end_timestamp = get_ISO_time() + self._safe_record(session, llm_event) except Exception as e: self._safe_record( - session, ErrorEvent(trigger_event=self.llm_event, exception=e) + session, ErrorEvent(trigger_event=llm_event, exception=e) ) kwargs_str = pprint.pformat(kwargs) @@ -124,23 +124,21 @@ async def async_generator(): # Handle object responses try: - self.llm_event.returns = response.model_dump() - self.llm_event.agent_id = check_call_stack_for_agent_id() - self.llm_event.prompt = kwargs["messages"] - self.llm_event.prompt_tokens = response.usage.input_tokens - self.llm_event.completion = { + llm_event.returns = response.model_dump() + llm_event.agent_id = check_call_stack_for_agent_id() + llm_event.prompt = kwargs["messages"] + llm_event.prompt_tokens = response.usage.input_tokens + llm_event.completion = { "role": "assistant", "content": response.content[0].text, } - self.llm_event.completion_tokens = response.usage.output_tokens - self.llm_event.model = response.model - self.llm_event.end_timestamp = get_ISO_time() + llm_event.completion_tokens = response.usage.output_tokens + llm_event.model = response.model + llm_event.end_timestamp = get_ISO_time() - self._safe_record(session, self.llm_event) + self._safe_record(session, llm_event) except Exception as e: - self._safe_record( - session, ErrorEvent(trigger_event=self.llm_event, exception=e) - ) + self._safe_record(session, ErrorEvent(trigger_event=llm_event, exception=e)) kwargs_str = pprint.pformat(kwargs) response = pprint.pformat(response) logger.warning( diff --git a/agentops/llms/cohere.py b/agentops/llms/cohere.py index 68658761..d76c221d 100644 --- a/agentops/llms/cohere.py +++ b/agentops/llms/cohere.py @@ -52,9 +52,9 @@ def handle_response( # from cohere.types.chat import ChatGenerationChunk # NOTE: Cohere only returns one message and its role will be CHATBOT which we are coercing to "assistant" - self.llm_event = LLMEvent(init_timestamp=init_timestamp, params=kwargs) + llm_event = LLMEvent(init_timestamp=init_timestamp, params=kwargs) if session is not None: - self.llm_event.session_id = session.session_id + llm_event.session_id = session.session_id self.action_events = {} @@ -62,22 +62,22 @@ def handle_stream_chunk(chunk, session: Optional[Session] = None): # We take the first chunk and accumulate the deltas from all subsequent chunks to build one full chat completion if isinstance(chunk, StreamedChatResponse_StreamStart): - self.llm_event.returns = chunk - self.llm_event.agent_id = check_call_stack_for_agent_id() - self.llm_event.model = kwargs.get("model", "command-r-plus") - self.llm_event.prompt = kwargs["message"] - self.llm_event.completion = "" + llm_event.returns = chunk + llm_event.agent_id = check_call_stack_for_agent_id() + llm_event.model = kwargs.get("model", "command-r-plus") + llm_event.prompt = kwargs["message"] + llm_event.completion = "" return try: if isinstance(chunk, StreamedChatResponse_StreamEnd): # StreamedChatResponse_TextGeneration = LLMEvent - self.llm_event.completion = { + llm_event.completion = { "role": "assistant", "content": chunk.response.text, } - self.llm_event.end_timestamp = get_ISO_time() - self._safe_record(session, self.llm_event) + llm_event.end_timestamp = get_ISO_time() + self._safe_record(session, llm_event) # StreamedChatResponse_SearchResults = ActionEvent search_results = chunk.response.search_results @@ -115,7 +115,7 @@ def handle_stream_chunk(chunk, session: Optional[Session] = None): self._safe_record(session, action_event) elif isinstance(chunk, StreamedChatResponse_TextGeneration): - self.llm_event.completion += chunk.text + llm_event.completion += chunk.text elif isinstance(chunk, StreamedChatResponse_ToolCallsGeneration): pass elif isinstance(chunk, StreamedChatResponse_CitationGeneration): @@ -139,7 +139,7 @@ def handle_stream_chunk(chunk, session: Optional[Session] = None): except Exception as e: self._safe_record( - session, ErrorEvent(trigger_event=self.llm_event, exception=e) + session, ErrorEvent(trigger_event=llm_event, exception=e) ) kwargs_str = pprint.pformat(kwargs) @@ -175,15 +175,15 @@ def generator(): # Not enough to record StreamedChatResponse_ToolCallsGeneration because the tool may have not gotten called try: - self.llm_event.returns = response - self.llm_event.agent_id = check_call_stack_for_agent_id() - self.llm_event.prompt = [] + llm_event.returns = response + llm_event.agent_id = check_call_stack_for_agent_id() + llm_event.prompt = [] if response.chat_history: role_map = {"USER": "user", "CHATBOT": "assistant", "SYSTEM": "system"} for i in range(len(response.chat_history) - 1): message = response.chat_history[i] - self.llm_event.prompt.append( + llm_event.prompt.append( { "role": role_map.get(message.role, message.role), "content": message.message, @@ -191,19 +191,17 @@ def generator(): ) last_message = response.chat_history[-1] - self.llm_event.completion = { + llm_event.completion = { "role": role_map.get(last_message.role, last_message.role), "content": last_message.message, } - self.llm_event.prompt_tokens = response.meta.tokens.input_tokens - self.llm_event.completion_tokens = response.meta.tokens.output_tokens - self.llm_event.model = kwargs.get("model", "command-r-plus") + llm_event.prompt_tokens = response.meta.tokens.input_tokens + llm_event.completion_tokens = response.meta.tokens.output_tokens + llm_event.model = kwargs.get("model", "command-r-plus") - self._safe_record(session, self.llm_event) + self._safe_record(session, llm_event) except Exception as e: - self._safe_record( - session, ErrorEvent(trigger_event=self.llm_event, exception=e) - ) + self._safe_record(session, ErrorEvent(trigger_event=llm_event, exception=e)) kwargs_str = pprint.pformat(kwargs) response = pprint.pformat(response) logger.warning( diff --git a/agentops/llms/groq.py b/agentops/llms/groq.py index 7d5f6800..ca869638 100644 --- a/agentops/llms/groq.py +++ b/agentops/llms/groq.py @@ -37,21 +37,21 @@ def handle_response( from groq.resources.chat import AsyncCompletions from groq.types.chat import ChatCompletionChunk - self.llm_event = LLMEvent(init_timestamp=init_timestamp, params=kwargs) + llm_event = LLMEvent(init_timestamp=init_timestamp, params=kwargs) if session is not None: - self.llm_event.session_id = session.session_id + llm_event.session_id = session.session_id def handle_stream_chunk(chunk: ChatCompletionChunk): # NOTE: prompt/completion usage not returned in response when streaming # We take the first ChatCompletionChunk and accumulate the deltas from all subsequent chunks to build one full chat completion - if self.llm_event.returns == None: - self.llm_event.returns = chunk + if llm_event.returns == None: + llm_event.returns = chunk try: - accumulated_delta = self.llm_event.returns.choices[0].delta - self.llm_event.agent_id = check_call_stack_for_agent_id() - self.llm_event.model = chunk.model - self.llm_event.prompt = kwargs["messages"] + accumulated_delta = llm_event.returns.choices[0].delta + llm_event.agent_id = check_call_stack_for_agent_id() + llm_event.model = chunk.model + llm_event.prompt = kwargs["messages"] # NOTE: We assume for completion only choices[0] is relevant choice = chunk.choices[0] @@ -70,21 +70,19 @@ def handle_stream_chunk(chunk: ChatCompletionChunk): if choice.finish_reason: # Streaming is done. Record LLMEvent - self.llm_event.returns.choices[0].finish_reason = ( - choice.finish_reason - ) - self.llm_event.completion = { + llm_event.returns.choices[0].finish_reason = choice.finish_reason + llm_event.completion = { "role": accumulated_delta.role, "content": accumulated_delta.content, "function_call": accumulated_delta.function_call, "tool_calls": accumulated_delta.tool_calls, } - self.llm_event.end_timestamp = get_ISO_time() + llm_event.end_timestamp = get_ISO_time() - self._safe_record(session, self.llm_event) + self._safe_record(session, llm_event) except Exception as e: self._safe_record( - session, ErrorEvent(trigger_event=self.llm_event, exception=e) + session, ErrorEvent(trigger_event=llm_event, exception=e) ) kwargs_str = pprint.pformat(kwargs) @@ -127,19 +125,17 @@ async def async_generator(): # v1.0.0+ responses are objects try: - self.llm_event.returns = response.model_dump() - self.llm_event.agent_id = check_call_stack_for_agent_id() - self.llm_event.prompt = kwargs["messages"] - self.llm_event.prompt_tokens = response.usage.prompt_tokens - self.llm_event.completion = response.choices[0].message.model_dump() - self.llm_event.completion_tokens = response.usage.completion_tokens - self.llm_event.model = response.model - - self._safe_record(session, self.llm_event) + llm_event.returns = response.model_dump() + llm_event.agent_id = check_call_stack_for_agent_id() + llm_event.prompt = kwargs["messages"] + llm_event.prompt_tokens = response.usage.prompt_tokens + llm_event.completion = response.choices[0].message.model_dump() + llm_event.completion_tokens = response.usage.completion_tokens + llm_event.model = response.model + + self._safe_record(session, llm_event) except Exception as e: - self._safe_record( - session, ErrorEvent(trigger_event=self.llm_event, exception=e) - ) + self._safe_record(session, ErrorEvent(trigger_event=llm_event, exception=e)) kwargs_str = pprint.pformat(kwargs) response = pprint.pformat(response) diff --git a/agentops/llms/litellm.py b/agentops/llms/litellm.py index 053c4251..30b4c25a 100644 --- a/agentops/llms/litellm.py +++ b/agentops/llms/litellm.py @@ -49,21 +49,21 @@ def handle_response( from openai.types.chat import ChatCompletionChunk from litellm.utils import CustomStreamWrapper - self.llm_event = LLMEvent(init_timestamp=init_timestamp, params=kwargs) + llm_event = LLMEvent(init_timestamp=init_timestamp, params=kwargs) if session is not None: - self.llm_event.session_id = session.session_id + llm_event.session_id = session.session_id def handle_stream_chunk(chunk: ChatCompletionChunk): # NOTE: prompt/completion usage not returned in response when streaming # We take the first ChatCompletionChunk and accumulate the deltas from all subsequent chunks to build one full chat completion - if self.llm_event.returns == None: - self.llm_event.returns = chunk + if llm_event.returns == None: + llm_event.returns = chunk try: - accumulated_delta = self.llm_event.returns.choices[0].delta - self.llm_event.agent_id = check_call_stack_for_agent_id() - self.llm_event.model = chunk.model - self.llm_event.prompt = kwargs["messages"] + accumulated_delta = llm_event.returns.choices[0].delta + llm_event.agent_id = check_call_stack_for_agent_id() + llm_event.model = chunk.model + llm_event.prompt = kwargs["messages"] # NOTE: We assume for completion only choices[0] is relevant choice = chunk.choices[0] @@ -82,21 +82,19 @@ def handle_stream_chunk(chunk: ChatCompletionChunk): if choice.finish_reason: # Streaming is done. Record LLMEvent - self.llm_event.returns.choices[0].finish_reason = ( - choice.finish_reason - ) - self.llm_event.completion = { + llm_event.returns.choices[0].finish_reason = choice.finish_reason + llm_event.completion = { "role": accumulated_delta.role, "content": accumulated_delta.content, "function_call": accumulated_delta.function_call, "tool_calls": accumulated_delta.tool_calls, } - self.llm_event.end_timestamp = get_ISO_time() + llm_event.end_timestamp = get_ISO_time() - self._safe_record(session, self.llm_event) + self._safe_record(session, llm_event) except Exception as e: self._safe_record( - session, ErrorEvent(trigger_event=self.llm_event, exception=e) + session, ErrorEvent(trigger_event=llm_event, exception=e) ) kwargs_str = pprint.pformat(kwargs) @@ -149,19 +147,17 @@ async def async_generator(): # v1.0.0+ responses are objects try: - self.llm_event.returns = response - self.llm_event.agent_id = check_call_stack_for_agent_id() - self.llm_event.prompt = kwargs["messages"] - self.llm_event.prompt_tokens = response.usage.prompt_tokens - self.llm_event.completion = response.choices[0].message.model_dump() - self.llm_event.completion_tokens = response.usage.completion_tokens - self.llm_event.model = response.model - - self._safe_record(session, self.llm_event) + llm_event.returns = response + llm_event.agent_id = check_call_stack_for_agent_id() + llm_event.prompt = kwargs["messages"] + llm_event.prompt_tokens = response.usage.prompt_tokens + llm_event.completion = response.choices[0].message.model_dump() + llm_event.completion_tokens = response.usage.completion_tokens + llm_event.model = response.model + + self._safe_record(session, llm_event) except Exception as e: - self._safe_record( - session, ErrorEvent(trigger_event=self.llm_event, exception=e) - ) + self._safe_record(session, ErrorEvent(trigger_event=llm_event, exception=e)) kwargs_str = pprint.pformat(kwargs) response = pprint.pformat(response) diff --git a/agentops/llms/ollama.py b/agentops/llms/ollama.py index bdcb2190..d17aba9a 100644 --- a/agentops/llms/ollama.py +++ b/agentops/llms/ollama.py @@ -19,25 +19,25 @@ class OllamaProvider(InstrumentedProvider): def handle_response( self, response, kwargs, init_timestamp, session: Optional[Session] = None ) -> dict: - self.llm_event = LLMEvent(init_timestamp=init_timestamp, params=kwargs) + llm_event = LLMEvent(init_timestamp=init_timestamp, params=kwargs) def handle_stream_chunk(chunk: dict): message = chunk.get("message", {"role": None, "content": ""}) if chunk.get("done"): - self.llm_event.completion["content"] += message.get("content") - self.llm_event.end_timestamp = get_ISO_time() - self.llm_event.model = f'ollama/{chunk.get("model")}' - self.llm_event.returns = chunk - self.llm_event.returns["message"] = self.llm_event.completion - self.llm_event.prompt = kwargs["messages"] - self.llm_event.agent_id = check_call_stack_for_agent_id() - self.client.record(self.llm_event) - - if self.llm_event.completion is None: - self.llm_event.completion = message + llm_event.completion["content"] += message.get("content") + llm_event.end_timestamp = get_ISO_time() + llm_event.model = f'ollama/{chunk.get("model")}' + llm_event.returns = chunk + llm_event.returns["message"] = llm_event.completion + llm_event.prompt = kwargs["messages"] + llm_event.agent_id = check_call_stack_for_agent_id() + self.client.record(llm_event) + + if llm_event.completion is None: + llm_event.completion = message else: - self.llm_event.completion["content"] += message.get("content") + llm_event.completion["content"] += message.get("content") if inspect.isgenerator(response): @@ -48,15 +48,15 @@ def generator(): return generator() - self.llm_event.end_timestamp = get_ISO_time() + llm_event.end_timestamp = get_ISO_time() - self.llm_event.model = f'ollama/{response["model"]}' - self.llm_event.returns = response - self.llm_event.agent_id = check_call_stack_for_agent_id() - self.llm_event.prompt = kwargs["messages"] - self.llm_event.completion = response["message"] + llm_event.model = f'ollama/{response["model"]}' + llm_event.returns = response + llm_event.agent_id = check_call_stack_for_agent_id() + llm_event.prompt = kwargs["messages"] + llm_event.completion = response["message"] - self._safe_record(session, self.llm_event) + self._safe_record(session, llm_event) return response def override(self): diff --git a/agentops/llms/openai.py b/agentops/llms/openai.py index 0fd31a1d..c99523d7 100644 --- a/agentops/llms/openai.py +++ b/agentops/llms/openai.py @@ -30,21 +30,21 @@ def handle_response( from openai.resources import AsyncCompletions from openai.types.chat import ChatCompletionChunk - self.llm_event = LLMEvent(init_timestamp=init_timestamp, params=kwargs) + llm_event = LLMEvent(init_timestamp=init_timestamp, params=kwargs) if session is not None: - self.llm_event.session_id = session.session_id + llm_event.session_id = session.session_id def handle_stream_chunk(chunk: ChatCompletionChunk): # NOTE: prompt/completion usage not returned in response when streaming # We take the first ChatCompletionChunk and accumulate the deltas from all subsequent chunks to build one full chat completion - if self.llm_event.returns == None: - self.llm_event.returns = chunk + if llm_event.returns == None: + llm_event.returns = chunk try: - accumulated_delta = self.llm_event.returns.choices[0].delta - self.llm_event.agent_id = check_call_stack_for_agent_id() - self.llm_event.model = chunk.model - self.llm_event.prompt = kwargs["messages"] + accumulated_delta = llm_event.returns.choices[0].delta + llm_event.agent_id = check_call_stack_for_agent_id() + llm_event.model = chunk.model + llm_event.prompt = kwargs["messages"] # NOTE: We assume for completion only choices[0] is relevant choice = chunk.choices[0] @@ -63,21 +63,19 @@ def handle_stream_chunk(chunk: ChatCompletionChunk): if choice.finish_reason: # Streaming is done. Record LLMEvent - self.llm_event.returns.choices[0].finish_reason = ( - choice.finish_reason - ) - self.llm_event.completion = { + llm_event.returns.choices[0].finish_reason = choice.finish_reason + llm_event.completion = { "role": accumulated_delta.role, "content": accumulated_delta.content, "function_call": accumulated_delta.function_call, "tool_calls": accumulated_delta.tool_calls, } - self.llm_event.end_timestamp = get_ISO_time() + llm_event.end_timestamp = get_ISO_time() - self._safe_record(session, self.llm_event) + self._safe_record(session, llm_event) except Exception as e: self._safe_record( - session, ErrorEvent(trigger_event=self.llm_event, exception=e) + session, ErrorEvent(trigger_event=llm_event, exception=e) ) kwargs_str = pprint.pformat(kwargs) @@ -120,19 +118,17 @@ async def async_generator(): # v1.0.0+ responses are objects try: - self.llm_event.returns = response - self.llm_event.agent_id = check_call_stack_for_agent_id() - self.llm_event.prompt = kwargs["messages"] - self.llm_event.prompt_tokens = response.usage.prompt_tokens - self.llm_event.completion = response.choices[0].message.model_dump() - self.llm_event.completion_tokens = response.usage.completion_tokens - self.llm_event.model = response.model - - self._safe_record(session, self.llm_event) + llm_event.returns = response + llm_event.agent_id = check_call_stack_for_agent_id() + llm_event.prompt = kwargs["messages"] + llm_event.prompt_tokens = response.usage.prompt_tokens + llm_event.completion = response.choices[0].message.model_dump() + llm_event.completion_tokens = response.usage.completion_tokens + llm_event.model = response.model + + self._safe_record(session, llm_event) except Exception as e: - self._safe_record( - session, ErrorEvent(trigger_event=self.llm_event, exception=e) - ) + self._safe_record(session, ErrorEvent(trigger_event=llm_event, exception=e)) kwargs_str = pprint.pformat(kwargs) response = pprint.pformat(response)