Skip to content

Commit

Permalink
Fixing duplicate llm_pkey errors (#389)
Browse files Browse the repository at this point in the history
* Fixed for multithreading and clarity

* dont need threading

* leftover line

* fixed for threading

* saving state. will replace with better solution

* removing self from llm_event in handle_response llm tracker logic due to multithreading issue

* cleaning up time travel output

* removing unused line

---------

Co-authored-by: Shawn Qiu <[email protected]>
  • Loading branch information
HowieG and siyangqiu authored Sep 17, 2024
1 parent 7b5d694 commit fecf20c
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 152 deletions.
1 change: 0 additions & 1 deletion agentops/llms/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ class LlmTracker:

def __init__(self, client):
self.client = client
self.completion = ""

def override_api(self):
"""
Expand Down
50 changes: 24 additions & 26 deletions agentops/llms/anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,27 +31,27 @@ def handle_response(
from anthropic.resources import AsyncMessages
from anthropic.types import Message

self.llm_event = LLMEvent(init_timestamp=init_timestamp, params=kwargs)
llm_event = LLMEvent(init_timestamp=init_timestamp, params=kwargs)
if session is not None:
self.llm_event.session_id = session.session_id
llm_event.session_id = session.session_id

def handle_stream_chunk(chunk: Message):
try:
# We take the first chunk and accumulate the deltas from all subsequent chunks to build one full chat completion
if chunk.type == "message_start":
self.llm_event.returns = chunk
self.llm_event.agent_id = check_call_stack_for_agent_id()
self.llm_event.model = kwargs["model"]
self.llm_event.prompt = kwargs["messages"]
self.llm_event.prompt_tokens = chunk.message.usage.input_tokens
self.llm_event.completion = {
llm_event.returns = chunk
llm_event.agent_id = check_call_stack_for_agent_id()
llm_event.model = kwargs["model"]
llm_event.prompt = kwargs["messages"]
llm_event.prompt_tokens = chunk.message.usage.input_tokens
llm_event.completion = {
"role": chunk.message.role,
"content": "", # Always returned as [] in this instance type
}

elif chunk.type == "content_block_start":
if chunk.content_block.type == "text":
self.llm_event.completion["content"] += chunk.content_block.text
llm_event.completion["content"] += chunk.content_block.text

elif chunk.content_block.type == "tool_use":
self.tool_id = chunk.content_block.id
Expand All @@ -62,7 +62,7 @@ def handle_stream_chunk(chunk: Message):

elif chunk.type == "content_block_delta":
if chunk.delta.type == "text_delta":
self.llm_event.completion["content"] += chunk.delta.text
llm_event.completion["content"] += chunk.delta.text

elif chunk.delta.type == "input_json_delta":
self.tool_event[self.tool_id].logs[
Expand All @@ -73,15 +73,15 @@ def handle_stream_chunk(chunk: Message):
pass

elif chunk.type == "message_delta":
self.llm_event.completion_tokens = chunk.usage.output_tokens
llm_event.completion_tokens = chunk.usage.output_tokens

elif chunk.type == "message_stop":
self.llm_event.end_timestamp = get_ISO_time()
self._safe_record(session, self.llm_event)
llm_event.end_timestamp = get_ISO_time()
self._safe_record(session, llm_event)

except Exception as e:
self._safe_record(
session, ErrorEvent(trigger_event=self.llm_event, exception=e)
session, ErrorEvent(trigger_event=llm_event, exception=e)
)

kwargs_str = pprint.pformat(kwargs)
Expand Down Expand Up @@ -124,23 +124,21 @@ async def async_generator():

# Handle object responses
try:
self.llm_event.returns = response.model_dump()
self.llm_event.agent_id = check_call_stack_for_agent_id()
self.llm_event.prompt = kwargs["messages"]
self.llm_event.prompt_tokens = response.usage.input_tokens
self.llm_event.completion = {
llm_event.returns = response.model_dump()
llm_event.agent_id = check_call_stack_for_agent_id()
llm_event.prompt = kwargs["messages"]
llm_event.prompt_tokens = response.usage.input_tokens
llm_event.completion = {
"role": "assistant",
"content": response.content[0].text,
}
self.llm_event.completion_tokens = response.usage.output_tokens
self.llm_event.model = response.model
self.llm_event.end_timestamp = get_ISO_time()
llm_event.completion_tokens = response.usage.output_tokens
llm_event.model = response.model
llm_event.end_timestamp = get_ISO_time()

self._safe_record(session, self.llm_event)
self._safe_record(session, llm_event)
except Exception as e:
self._safe_record(
session, ErrorEvent(trigger_event=self.llm_event, exception=e)
)
self._safe_record(session, ErrorEvent(trigger_event=llm_event, exception=e))
kwargs_str = pprint.pformat(kwargs)
response = pprint.pformat(response)
logger.warning(
Expand Down
46 changes: 22 additions & 24 deletions agentops/llms/cohere.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,32 +52,32 @@ def handle_response(

# from cohere.types.chat import ChatGenerationChunk
# NOTE: Cohere only returns one message and its role will be CHATBOT which we are coercing to "assistant"
self.llm_event = LLMEvent(init_timestamp=init_timestamp, params=kwargs)
llm_event = LLMEvent(init_timestamp=init_timestamp, params=kwargs)
if session is not None:
self.llm_event.session_id = session.session_id
llm_event.session_id = session.session_id

self.action_events = {}

def handle_stream_chunk(chunk, session: Optional[Session] = None):

# We take the first chunk and accumulate the deltas from all subsequent chunks to build one full chat completion
if isinstance(chunk, StreamedChatResponse_StreamStart):
self.llm_event.returns = chunk
self.llm_event.agent_id = check_call_stack_for_agent_id()
self.llm_event.model = kwargs.get("model", "command-r-plus")
self.llm_event.prompt = kwargs["message"]
self.llm_event.completion = ""
llm_event.returns = chunk
llm_event.agent_id = check_call_stack_for_agent_id()
llm_event.model = kwargs.get("model", "command-r-plus")
llm_event.prompt = kwargs["message"]
llm_event.completion = ""
return

try:
if isinstance(chunk, StreamedChatResponse_StreamEnd):
# StreamedChatResponse_TextGeneration = LLMEvent
self.llm_event.completion = {
llm_event.completion = {
"role": "assistant",
"content": chunk.response.text,
}
self.llm_event.end_timestamp = get_ISO_time()
self._safe_record(session, self.llm_event)
llm_event.end_timestamp = get_ISO_time()
self._safe_record(session, llm_event)

# StreamedChatResponse_SearchResults = ActionEvent
search_results = chunk.response.search_results
Expand Down Expand Up @@ -115,7 +115,7 @@ def handle_stream_chunk(chunk, session: Optional[Session] = None):
self._safe_record(session, action_event)

elif isinstance(chunk, StreamedChatResponse_TextGeneration):
self.llm_event.completion += chunk.text
llm_event.completion += chunk.text
elif isinstance(chunk, StreamedChatResponse_ToolCallsGeneration):
pass
elif isinstance(chunk, StreamedChatResponse_CitationGeneration):
Expand All @@ -139,7 +139,7 @@ def handle_stream_chunk(chunk, session: Optional[Session] = None):

except Exception as e:
self._safe_record(
session, ErrorEvent(trigger_event=self.llm_event, exception=e)
session, ErrorEvent(trigger_event=llm_event, exception=e)
)

kwargs_str = pprint.pformat(kwargs)
Expand Down Expand Up @@ -175,35 +175,33 @@ def generator():
# Not enough to record StreamedChatResponse_ToolCallsGeneration because the tool may have not gotten called

try:
self.llm_event.returns = response
self.llm_event.agent_id = check_call_stack_for_agent_id()
self.llm_event.prompt = []
llm_event.returns = response
llm_event.agent_id = check_call_stack_for_agent_id()
llm_event.prompt = []
if response.chat_history:
role_map = {"USER": "user", "CHATBOT": "assistant", "SYSTEM": "system"}

for i in range(len(response.chat_history) - 1):
message = response.chat_history[i]
self.llm_event.prompt.append(
llm_event.prompt.append(
{
"role": role_map.get(message.role, message.role),
"content": message.message,
}
)

last_message = response.chat_history[-1]
self.llm_event.completion = {
llm_event.completion = {
"role": role_map.get(last_message.role, last_message.role),
"content": last_message.message,
}
self.llm_event.prompt_tokens = response.meta.tokens.input_tokens
self.llm_event.completion_tokens = response.meta.tokens.output_tokens
self.llm_event.model = kwargs.get("model", "command-r-plus")
llm_event.prompt_tokens = response.meta.tokens.input_tokens
llm_event.completion_tokens = response.meta.tokens.output_tokens
llm_event.model = kwargs.get("model", "command-r-plus")

self._safe_record(session, self.llm_event)
self._safe_record(session, llm_event)
except Exception as e:
self._safe_record(
session, ErrorEvent(trigger_event=self.llm_event, exception=e)
)
self._safe_record(session, ErrorEvent(trigger_event=llm_event, exception=e))
kwargs_str = pprint.pformat(kwargs)
response = pprint.pformat(response)
logger.warning(
Expand Down
50 changes: 23 additions & 27 deletions agentops/llms/groq.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,21 @@ def handle_response(
from groq.resources.chat import AsyncCompletions
from groq.types.chat import ChatCompletionChunk

self.llm_event = LLMEvent(init_timestamp=init_timestamp, params=kwargs)
llm_event = LLMEvent(init_timestamp=init_timestamp, params=kwargs)
if session is not None:
self.llm_event.session_id = session.session_id
llm_event.session_id = session.session_id

def handle_stream_chunk(chunk: ChatCompletionChunk):
# NOTE: prompt/completion usage not returned in response when streaming
# We take the first ChatCompletionChunk and accumulate the deltas from all subsequent chunks to build one full chat completion
if self.llm_event.returns == None:
self.llm_event.returns = chunk
if llm_event.returns == None:
llm_event.returns = chunk

try:
accumulated_delta = self.llm_event.returns.choices[0].delta
self.llm_event.agent_id = check_call_stack_for_agent_id()
self.llm_event.model = chunk.model
self.llm_event.prompt = kwargs["messages"]
accumulated_delta = llm_event.returns.choices[0].delta
llm_event.agent_id = check_call_stack_for_agent_id()
llm_event.model = chunk.model
llm_event.prompt = kwargs["messages"]

# NOTE: We assume for completion only choices[0] is relevant
choice = chunk.choices[0]
Expand All @@ -70,21 +70,19 @@ def handle_stream_chunk(chunk: ChatCompletionChunk):

if choice.finish_reason:
# Streaming is done. Record LLMEvent
self.llm_event.returns.choices[0].finish_reason = (
choice.finish_reason
)
self.llm_event.completion = {
llm_event.returns.choices[0].finish_reason = choice.finish_reason
llm_event.completion = {
"role": accumulated_delta.role,
"content": accumulated_delta.content,
"function_call": accumulated_delta.function_call,
"tool_calls": accumulated_delta.tool_calls,
}
self.llm_event.end_timestamp = get_ISO_time()
llm_event.end_timestamp = get_ISO_time()

self._safe_record(session, self.llm_event)
self._safe_record(session, llm_event)
except Exception as e:
self._safe_record(
session, ErrorEvent(trigger_event=self.llm_event, exception=e)
session, ErrorEvent(trigger_event=llm_event, exception=e)
)

kwargs_str = pprint.pformat(kwargs)
Expand Down Expand Up @@ -127,19 +125,17 @@ async def async_generator():

# v1.0.0+ responses are objects
try:
self.llm_event.returns = response.model_dump()
self.llm_event.agent_id = check_call_stack_for_agent_id()
self.llm_event.prompt = kwargs["messages"]
self.llm_event.prompt_tokens = response.usage.prompt_tokens
self.llm_event.completion = response.choices[0].message.model_dump()
self.llm_event.completion_tokens = response.usage.completion_tokens
self.llm_event.model = response.model

self._safe_record(session, self.llm_event)
llm_event.returns = response.model_dump()
llm_event.agent_id = check_call_stack_for_agent_id()
llm_event.prompt = kwargs["messages"]
llm_event.prompt_tokens = response.usage.prompt_tokens
llm_event.completion = response.choices[0].message.model_dump()
llm_event.completion_tokens = response.usage.completion_tokens
llm_event.model = response.model

self._safe_record(session, llm_event)
except Exception as e:
self._safe_record(
session, ErrorEvent(trigger_event=self.llm_event, exception=e)
)
self._safe_record(session, ErrorEvent(trigger_event=llm_event, exception=e))

kwargs_str = pprint.pformat(kwargs)
response = pprint.pformat(response)
Expand Down
50 changes: 23 additions & 27 deletions agentops/llms/litellm.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,21 @@ def handle_response(
from openai.types.chat import ChatCompletionChunk
from litellm.utils import CustomStreamWrapper

self.llm_event = LLMEvent(init_timestamp=init_timestamp, params=kwargs)
llm_event = LLMEvent(init_timestamp=init_timestamp, params=kwargs)
if session is not None:
self.llm_event.session_id = session.session_id
llm_event.session_id = session.session_id

def handle_stream_chunk(chunk: ChatCompletionChunk):
# NOTE: prompt/completion usage not returned in response when streaming
# We take the first ChatCompletionChunk and accumulate the deltas from all subsequent chunks to build one full chat completion
if self.llm_event.returns == None:
self.llm_event.returns = chunk
if llm_event.returns == None:
llm_event.returns = chunk

try:
accumulated_delta = self.llm_event.returns.choices[0].delta
self.llm_event.agent_id = check_call_stack_for_agent_id()
self.llm_event.model = chunk.model
self.llm_event.prompt = kwargs["messages"]
accumulated_delta = llm_event.returns.choices[0].delta
llm_event.agent_id = check_call_stack_for_agent_id()
llm_event.model = chunk.model
llm_event.prompt = kwargs["messages"]

# NOTE: We assume for completion only choices[0] is relevant
choice = chunk.choices[0]
Expand All @@ -82,21 +82,19 @@ def handle_stream_chunk(chunk: ChatCompletionChunk):

if choice.finish_reason:
# Streaming is done. Record LLMEvent
self.llm_event.returns.choices[0].finish_reason = (
choice.finish_reason
)
self.llm_event.completion = {
llm_event.returns.choices[0].finish_reason = choice.finish_reason
llm_event.completion = {
"role": accumulated_delta.role,
"content": accumulated_delta.content,
"function_call": accumulated_delta.function_call,
"tool_calls": accumulated_delta.tool_calls,
}
self.llm_event.end_timestamp = get_ISO_time()
llm_event.end_timestamp = get_ISO_time()

self._safe_record(session, self.llm_event)
self._safe_record(session, llm_event)
except Exception as e:
self._safe_record(
session, ErrorEvent(trigger_event=self.llm_event, exception=e)
session, ErrorEvent(trigger_event=llm_event, exception=e)
)

kwargs_str = pprint.pformat(kwargs)
Expand Down Expand Up @@ -149,19 +147,17 @@ async def async_generator():

# v1.0.0+ responses are objects
try:
self.llm_event.returns = response
self.llm_event.agent_id = check_call_stack_for_agent_id()
self.llm_event.prompt = kwargs["messages"]
self.llm_event.prompt_tokens = response.usage.prompt_tokens
self.llm_event.completion = response.choices[0].message.model_dump()
self.llm_event.completion_tokens = response.usage.completion_tokens
self.llm_event.model = response.model

self._safe_record(session, self.llm_event)
llm_event.returns = response
llm_event.agent_id = check_call_stack_for_agent_id()
llm_event.prompt = kwargs["messages"]
llm_event.prompt_tokens = response.usage.prompt_tokens
llm_event.completion = response.choices[0].message.model_dump()
llm_event.completion_tokens = response.usage.completion_tokens
llm_event.model = response.model

self._safe_record(session, llm_event)
except Exception as e:
self._safe_record(
session, ErrorEvent(trigger_event=self.llm_event, exception=e)
)
self._safe_record(session, ErrorEvent(trigger_event=llm_event, exception=e))

kwargs_str = pprint.pformat(kwargs)
response = pprint.pformat(response)
Expand Down
Loading

0 comments on commit fecf20c

Please sign in to comment.